diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..1c446a6 --- /dev/null +++ b/src/db.rs @@ -0,0 +1,1556 @@ +//[file name]: db.rs +// Основной модуль базы данных futriix, содержащий все компоненты СУБД +// Включает: движок хранения, WAL, индексы, Raft консенсус, ACL, транзакции, репликацию, бэкапы и процедуры + +use std::collections::HashMap; +use std::sync::Arc; +use parking_lot::RwLock; +use serde_json::Value; +use crate::config::Config; + +// Модуль движка хранения данных - отвечает за основное хранение документов и кортежей +mod storage { + use super::*; + use chrono::Utc; + + /// Движок хранения данных с документной моделью + /// Обеспечивает CRUD операции над пространствами, документами и кортежами + pub struct StorageEngine { + // Пространства данных (аналоги коллекциям/таблицам) + spaces: Arc>>, + // Путь к директории данных на диске + path: String, + } + + /// Пространство данных - контейнер для документов и кортежей + struct Space { + // Документы в пространстве (ключ-значение) + documents: HashMap, + // Кортежи в пространстве (структурированные данные) + tuples: HashMap, + } + + /// Документ - основной элемент хранения данных + struct Document { + // Значение документа в формате JSON + value: Value, + // Время создания документа + created_at: String, + // Время последнего обновления документа + updated_at: String, + } + + /// Кортеж - структурированный элемент данных + struct Tuple { + // Значение кортежа в формате JSON + value: Value, + // Время создания кортежа + created_at: String, + // Время последнего обновления кортежа + updated_at: String, + } + + impl StorageEngine { + /// Создает новый движок хранения данных + pub fn new(path: &str) -> Result { + Ok(Self { + spaces: Arc::new(RwLock::new(HashMap::new())), + path: path.to_string(), + }) + } + + /// Создает новое пространство данных + pub fn create_space(&self, name: &str) -> Result<(), String> { + let mut spaces = self.spaces.write(); + if spaces.contains_key(name) { + return Err("Space already exists".to_string()); + } + spaces.insert(name.to_string(), Space { + documents: HashMap::new(), + tuples: HashMap::new(), + }); + Ok(()) + } + + /// Удаляет пространство данных и все его содержимое + pub fn delete_space(&self, name: &str) -> Result<(), String> { + let mut spaces = self.spaces.write(); + spaces.remove(name) + .map(|_| ()) + .ok_or_else(|| "Space not found".to_string()) + } + + /// Вставляет документ в указанное пространство + pub fn insert(&self, space: &str, key: &str, value: Value) -> Result<(), String> { + let mut spaces = self.spaces.write(); + let space_obj = spaces.get_mut(space) + .ok_or_else(|| "Space not found".to_string())?; + + if space_obj.documents.contains_key(key) { + return Err("Document already exists".to_string()); + } + + let timestamp = Utc::now().to_rfc3339(); + space_obj.documents.insert(key.to_string(), Document { + value, + created_at: timestamp.clone(), + updated_at: timestamp, + }); + Ok(()) + } + + /// Получает документ из указанного пространства по ключу + pub fn get(&self, space: &str, key: &str) -> Result, String> { + let spaces = self.spaces.read(); + match spaces.get(space) { + Some(space_obj) => Ok(space_obj.documents.get(key).map(|doc| { + let mut value = doc.value.clone(); + // Добавляем метаданные о времени создания и обновления + if let Some(obj) = value.as_object_mut() { + obj.insert("_created_at".to_string(), Value::String(doc.created_at.clone())); + obj.insert("_updated_at".to_string(), Value::String(doc.updated_at.clone())); + } + value + })), + None => Err("Space not found".to_string()), + } + } + + /// Обновляет существующий документ в указанном пространстве + pub fn update(&self, space: &str, key: &str, value: Value) -> Result<(), String> { + let mut spaces = self.spaces.write(); + let space_obj = spaces.get_mut(space) + .ok_or_else(|| "Space not found".to_string())?; + + if let Some(document) = space_obj.documents.get_mut(key) { + document.value = value; + document.updated_at = Utc::now().to_rfc3339(); + Ok(()) + } else { + Err("Document not found".to_string()) + } + } + + /// Удаляет документ из указанного пространства + pub fn delete(&self, space: &str, key: &str) -> Result<(), String> { + let mut spaces = self.spaces.write(); + let space_obj = spaces.get_mut(space) + .ok_or_else(|| "Space not found".to_string())?; + + space_obj.documents.remove(key) + .map(|_| ()) + .ok_or_else(|| "Document not found".to_string()) + } + + /// Создает кортеж в указанном пространстве + pub fn create_tuple(&self, space: &str, tuple_id: &str, value: Value) -> Result<(), String> { + let mut spaces = self.spaces.write(); + let space_obj = spaces.get_mut(space) + .ok_or_else(|| "Space not found".to_string())?; + + let timestamp = Utc::now().to_rfc3339(); + space_obj.tuples.insert(tuple_id.to_string(), Tuple { + value, + created_at: timestamp.clone(), + updated_at: timestamp, + }); + Ok(()) + } + + /// Читает кортеж из указанного пространства + pub fn read_tuple(&self, space: &str, tuple_id: &str) -> Result, String> { + let spaces = self.spaces.read(); + match spaces.get(space) { + Some(space_obj) => Ok(space_obj.tuples.get(tuple_id).map(|tuple| { + let mut value = tuple.value.clone(); + // Добавляем метаданные о времени создания и обновления + if let Some(obj) = value.as_object_mut() { + obj.insert("_created_at".to_string(), Value::String(tuple.created_at.clone())); + obj.insert("_updated_at".to_string(), Value::String(tuple.updated_at.clone())); + } + value + })), + None => Err("Space not found".to_string()), + } + } + + /// Удаляет кортеж из указанного пространства + pub fn delete_tuple(&self, space: &str, tuple_id: &str) -> Result<(), String> { + let mut spaces = self.spaces.write(); + let space_obj = spaces.get_mut(space) + .ok_or_else(|| "Space not found".to_string())?; + + space_obj.tuples.remove(tuple_id) + .map(|_| ()) + .ok_or_else(|| "Tuple not found".to_string()) + } + + /// Получает все данные из всех пространств (для бэкапов и экспорта) + pub fn get_all_data(&self) -> HashMap> { + let spaces = self.spaces.read(); + let mut result = HashMap::new(); + + for (space_name, space) in spaces.iter() { + let mut space_data = HashMap::new(); + for (key, document) in &space.documents { + let mut value = document.value.clone(); + if let Some(obj) = value.as_object_mut() { + obj.insert("_created_at".to_string(), Value::String(document.created_at.clone())); + obj.insert("_updated_at".to_string(), Value::String(document.updated_at.clone())); + } + space_data.insert(key.clone(), value); + } + result.insert(space_name.clone(), space_data); + } + + result + } + + /// Восстанавливает данные из внешнего источника (для восстановления бэкапов) + pub fn restore_from_data(&self, data: HashMap>) -> Result<(), String> { + let mut spaces = self.spaces.write(); + spaces.clear(); + + for (space_name, documents) in data { + let mut space = Space { + documents: HashMap::new(), + tuples: HashMap::new(), + }; + + for (key, value) in documents { + let timestamp = Utc::now().to_rfc3339(); + // Восстанавливаем временные метки из данных или используем текущее время + let created_at = if let Some(obj) = value.as_object() { + obj.get("_created_at") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .unwrap_or_else(|| timestamp.clone()) + } else { + timestamp.clone() + }; + + let updated_at = if let Some(obj) = value.as_object() { + obj.get("_updated_at") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .unwrap_or_else(|| timestamp.clone()) + } else { + timestamp.clone() + }; + + // Убираем метаданные из основного значения + let mut clean_value = value.clone(); + if let Some(obj) = clean_value.as_object_mut() { + obj.remove("_created_at"); + obj.remove("_updated_at"); + } + + space.documents.insert(key, Document { + value: clean_value, + created_at, + updated_at, + }); + } + + spaces.insert(space_name, space); + } + + Ok(()) + } + } +} + +// Модуль Write-Ahead Log с lock-free архитектурой +// Обеспечивает надежность данных через журналирование операций +mod wal_lockfree { + use super::*; + use crossbeam::queue::SegQueue; + use std::fs::{OpenOptions, File}; + use std::io::{Write, BufWriter}; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::thread; + use std::time::Duration; + use chrono::Utc; + use serde::Serialize; + + /// Lock-free Write-Ahead Log для обеспечения надежности данных + /// Все операции сначала записываются в WAL, затем применяются к хранилищу + pub struct LockFreeWAL { + // Очередь операций для записи в лог + queue: Arc>, + // Хэндл потока записи + writer_handle: Option>, + // Флаг завершения работы + shutdown: Arc, + } + + /// Запись в WAL - содержит информацию об операции + #[derive(Clone, Debug, Serialize)] + pub struct LogEntry { + // Временная метка операции + pub timestamp: String, + // Тип операции (create, update, delete) + pub operation: String, + // Пространство/коллекция + pub collection: String, + // Ключ документа + pub key: String, + // Значение документа + pub value: Value, + } + + impl LockFreeWAL { + /// Создает новый WAL с указанным файлом для записи + pub fn new(filename: &str) -> Result { + let queue = Arc::new(SegQueue::new()); + let shutdown = Arc::new(AtomicBool::new(false)); + + let filename = filename.to_string(); + let queue_clone = queue.clone(); + let shutdown_clone = shutdown.clone(); + + // Запускаем отдельный поток для асинхронной записи в лог + let writer_handle = thread::spawn(move || { + let file = OpenOptions::new() + .create(true) + .append(true) + .open(&filename) + .expect("Failed to open WAL file"); + + let mut writer = BufWriter::new(file); + + // Основной цикл записи - работает пока есть операции или не получен сигнал завершения + while !shutdown_clone.load(Ordering::Relaxed) || !queue_clone.is_empty() { + if let Some(entry) = queue_clone.pop() { + if let Ok(json_str) = serde_json::to_string(&entry) { + let _ = writeln!(writer, "{}", json_str); + let _ = writer.flush(); + } + } else { + // Небольшая пауза если очередь пуста + thread::sleep(Duration::from_millis(10)); + } + } + }); + + Ok(Self { + queue, + writer_handle: Some(writer_handle), + shutdown, + }) + } + + /// Добавляет операцию в лог (lock-free, неблокирующая) + pub fn log_operation(&self, operation: &str, collection: &str, key: &str, value: &Value) { + let entry = LogEntry { + timestamp: Utc::now().to_rfc3339(), + operation: operation.to_string(), + collection: collection.to_string(), + key: key.to_string(), + value: value.clone(), + }; + + self.queue.push(entry); + } + + /// Восстанавливает данные из WAL (в данной реализации упрощенно) + pub fn recover(&self) -> Result, String> { + let entries = Vec::new(); + // В реальной реализации здесь бы читались записи из файла + // и возвращались для восстановления после сбоя + Ok(entries) + } + } + + impl Drop for LockFreeWAL { + fn drop(&mut self) { + // При уничтожении объекта сигнализируем потоку о завершении + self.shutdown.store(true, Ordering::Relaxed); + if let Some(handle) = self.writer_handle.take() { + let _ = handle.join(); + } + } + } +} + +// Модуль управления индексами для ускорения поиска +mod index { + use super::*; + use std::collections::BTreeMap; + + /// Менеджер индексов для ускорения операций поиска + /// Поддерживает первичные и вторичные индексы разных типов + pub struct IndexManager { + // Первичные индексы по коллекциям + primary_indexes: RwLock>, + // Вторичные индексы (по полям документов) + secondary_indexes: RwLock>>, + } + + /// Первичный индекс - отображает ключи на значения + struct PrimaryIndex { + index: HashMap, + } + + /// Типы вторичных индексов + enum SecondaryIndex { + // Хэш-индекс для точного поиска + Hash(HashMap>), + // B-дерево для диапазонных запросов + BTree(BTreeMap>), + } + + impl IndexManager { + /// Создает новый менеджер индексов + pub fn new() -> Self { + Self { + primary_indexes: RwLock::new(HashMap::new()), + secondary_indexes: RwLock::new(HashMap::new()), + } + } + + /// Создает первичный индекс для коллекции + pub fn create_primary_index(&self, collection: &str) -> Result<(), String> { + let mut indexes = self.primary_indexes.write(); + if indexes.contains_key(collection) { + return Err("Primary index already exists".to_string()); + } + + indexes.insert(collection.to_string(), PrimaryIndex { + index: HashMap::new(), + }); + Ok(()) + } + + /// Создает вторичный индекс для поля коллекции + pub fn create_secondary_index(&self, collection: &str, field: &str, index_type: &str) -> Result<(), String> { + let mut indexes = self.secondary_indexes.write(); + let collection_indexes = indexes.entry(collection.to_string()) + .or_insert_with(HashMap::new); + + if collection_indexes.contains_key(field) { + return Err("Secondary index already exists".to_string()); + } + + let index = match index_type { + "hash" => SecondaryIndex::Hash(HashMap::new()), + "btree" => SecondaryIndex::BTree(BTreeMap::new()), + _ => return Err(format!("Unsupported index type: {}", index_type)), + }; + + collection_indexes.insert(field.to_string(), index); + Ok(()) + } + + /// Удаляет первичный индекс коллекции + pub fn drop_primary_index(&self, collection: &str) -> Result<(), String> { + let mut indexes = self.primary_indexes.write(); + indexes.remove(collection) + .map(|_| ()) + .ok_or_else(|| "Primary index not found".to_string()) + } + + /// Удаляет вторичный индекс поля коллекции + pub fn drop_secondary_index(&self, collection: &str, field: &str) -> Result<(), String> { + let mut indexes = self.secondary_indexes.write(); + if let Some(collection_indexes) = indexes.get_mut(collection) { + collection_indexes.remove(field) + .map(|_| ()) + .ok_or_else(|| "Secondary index not found".to_string()) + } else { + Err("Collection not found".to_string()) + } + } + + /// Поиск по вторичному индексу + pub fn search_index(&self, collection: &str, field: &str, value: &str) -> Result, String> { + let indexes = self.secondary_indexes.read(); + if let Some(collection_indexes) = indexes.get(collection) { + if let Some(index) = collection_indexes.get(field) { + match index { + SecondaryIndex::Hash(hash_map) => { + let json_value: serde_json::Value = serde_json::from_str(value) + .map_err(|e| format!("Invalid value: {}", e))?; + Ok(hash_map.get(&json_value) + .cloned() + .unwrap_or_default()) + }, + _ => Ok(vec![]), // Упрощённая реализация для других типов индексов + } + } else { + Err("Index not found".to_string()) + } + } else { + Err("Collection not found".to_string()) + } + } + + /// Поиск по первичному индексу + pub fn search_primary(&self, collection: &str, key: &str) -> Option { + let indexes = self.primary_indexes.read(); + indexes.get(collection) + .and_then(|index| index.index.get(key).cloned()) + } + } +} + +// Модуль Raft консенсуса для распределенной работы +mod raft { + use super::*; + + /// Raft консенсус для кластеризации и распределенной работы + /// Обеспечивает согласованность данных между узлами кластера + pub struct RaftCluster { + // Узлы кластера + nodes: RwLock>, + // Состояние Raft алгоритма + state: RwLock, + // ID текущего узла + current_node_id: String, + } + + /// Узел Raft кластера + struct RaftNode { + // URL узла + url: String, + // Роль узла в кластере + role: NodeRole, + } + + /// Роли узлов в Raft алгоритме + #[derive(Clone, Debug, PartialEq)] + pub enum NodeRole { + Leader, // Лидер - обрабатывает запросы клиентов + Follower, // Последователь - реплицирует данные от лидера + Candidate, // Кандидат - участвует в выборах + } + + /// Состояние Raft алгоритма + struct RaftState { + // Текущий термин (увеличивается при выборах) + current_term: u64, + // За кого голосовал узел в текущем терме + voted_for: Option, + // Журнал операций для репликации + log: Vec, + } + + /// Запись в журнале Raft + struct LogEntry { + // Термин записи + term: u64, + // Команда для выполнения + command: String, + } + + impl RaftCluster { + /// Создает новый Raft кластер + pub async fn new() -> Result { + let node_id = "node_1".to_string(); + let mut nodes = HashMap::new(); + // Первый узел по умолчанию становится лидером + nodes.insert(node_id.clone(), RaftNode { + url: "http://localhost:9080".to_string(), + role: NodeRole::Leader, + }); + + Ok(Self { + nodes: RwLock::new(nodes), + state: RwLock::new(RaftState { + current_term: 0, + voted_for: None, + log: Vec::new(), + }), + current_node_id: node_id, + }) + } + + /// Добавляет узел в кластер + pub async fn add_node(&self, node_url: String) -> Result<(), String> { + let node_id = format!("node_{}", self.nodes.read().len() + 1); + let mut nodes = self.nodes.write(); + nodes.insert(node_id.clone(), RaftNode { + url: node_url, + role: NodeRole::Follower, // Новые узлы становятся последователями + }); + Ok(()) + } + + /// Удаляет узел из кластера + pub async fn remove_node(&mut self, node_id: &str) -> Result<(), String> { + let mut nodes = self.nodes.write(); + nodes.remove(node_id) + .map(|_| ()) + .ok_or_else(|| "Node not found".to_string()) + } + + /// Возвращает список всех узлов кластера с их ролями + pub fn list_nodes(&self) -> Vec<(String, String)> { + let nodes = self.nodes.read(); + nodes.iter() + .map(|(id, node)| { + let role = match node.role { + NodeRole::Leader => "leader", + NodeRole::Follower => "follower", + NodeRole::Candidate => "candidate", + }; + (id.clone(), role.to_string()) + }) + .collect() + } + + /// Возвращает статус кластера + pub fn get_cluster_status(&self) -> String { + let nodes = self.nodes.read(); + if nodes.values().any(|node| matches!(node.role, NodeRole::Leader)) { + "cluster_formed".to_string() + } else { + "cluster_not_formed".to_string() + } + } + + /// Получить роль текущего узла + pub fn get_current_node_role(&self) -> NodeRole { + let nodes = self.nodes.read(); + nodes.get(&self.current_node_id) + .map(|node| node.role.clone()) + .unwrap_or(NodeRole::Follower) + } + + /// Получить ID текущего узла + pub fn get_current_node_id(&self) -> String { + self.current_node_id.clone() + } + } +} + +// Модуль контроля доступа (ACL) +mod acl { + use super::*; + + /// Система контроля доступа (Access Control List) + /// Управляет правами пользователей на операции с данными + pub struct AccessControl { + // Права доступа пользователей + permissions: RwLock>, + } + + /// Права доступа пользователя + #[derive(Clone)] + pub struct UserPermissions { + // Может читать данные + pub can_read: bool, + // Может обновлять данные + pub can_write: bool, + // Может создавать новые документы + pub can_create: bool, + // Может удалять документы + pub can_delete: bool, + // Разрешенные коллекции (пустой список = все коллекции) + pub allowed_collections: Vec, + } + + impl AccessControl { + /// Создает новую систему контроля доступа + pub fn new() -> Self { + Self { + permissions: RwLock::new(HashMap::new()), + } + } + + /// Добавляет пользователя с указанными правами + pub fn add_user(&self, username: &str, permissions: UserPermissions) { + let mut perms = self.permissions.write(); + perms.insert(username.to_string(), permissions); + } + + /// Проверяет право на чтение + pub fn can_read(&self, username: &str, collection: &str) -> bool { + let perms = self.permissions.read(); + if let Some(user_perms) = perms.get(username) { + user_perms.can_read && + (user_perms.allowed_collections.is_empty() || + user_perms.allowed_collections.contains(&collection.to_string())) + } else { + false + } + } + + /// Проверяет право на запись + pub fn can_write(&self, username: &str, collection: &str) -> bool { + let perms = self.permissions.read(); + if let Some(user_perms) = perms.get(username) { + user_perms.can_write && + (user_perms.allowed_collections.is_empty() || + user_perms.allowed_collections.contains(&collection.to_string())) + } else { + false + } + } + + /// Проверяет право на создание + pub fn can_create(&self, username: &str, collection: &str) -> bool { + let perms = self.permissions.read(); + if let Some(user_perms) = perms.get(username) { + user_perms.can_create && + (user_perms.allowed_collections.is_empty() || + user_perms.allowed_collections.contains(&collection.to_string())) + } else { + false + } + } + + /// Проверяет право на удаление + pub fn can_delete(&self, username: &str, collection: &str) -> bool { + let perms = self.permissions.read(); + if let Some(user_perms) = perms.get(username) { + user_perms.can_delete && + (user_perms.allowed_collections.is_empty() || + user_perms.allowed_collections.contains(&collection.to_string())) + } else { + false + } + } + } + + impl Default for UserPermissions { + fn default() -> Self { + Self { + can_read: true, + can_write: true, + can_create: true, + can_delete: true, + allowed_collections: vec![], + } + } + } +} + +// Модуль lock-free транзакций +mod transaction_lockfree { + use super::*; + use crossbeam::queue::SegQueue; + use std::sync::atomic::{AtomicU64, Ordering}; + use serde::Serialize; + + /// Операция в транзакции + #[derive(Debug, Clone, Serialize)] + pub enum TransactionOperation { + Create { collection: String, key: String, value: Value }, + Update { collection: String, key: String, value: Value }, + Delete { collection: String, key: String }, + } + + /// Транзакция - набор операций, выполняемых атомарно + #[derive(Debug, Clone, Serialize)] + pub struct Transaction { + // ID транзакции + pub id: String, + // Операции транзакции + pub operations: Vec, + // Состояние транзакции + pub state: TransactionState, + } + + /// Состояние транзакции + #[derive(Debug, PartialEq, Clone, Serialize)] + pub enum TransactionState { + Active, // Транзакция активна + Committed, // Транзакция завершена успешно + RolledBack, // Транзакция отменена + } + + /// Менеджер lock-free транзакций + pub struct LockFreeTransactionManager { + // Очередь транзакций + transactions: Arc>, + // Счетчик для генерации ID транзакций + next_id: AtomicU64, + } + + impl LockFreeTransactionManager { + /// Создает новый менеджер транзакций + pub fn new() -> Self { + Self { + transactions: Arc::new(SegQueue::new()), + next_id: AtomicU64::new(1), + } + } + + /// Начинает новую транзакцию + pub fn begin_transaction(&self) -> String { + let tx_id = format!("tx_{}", self.next_id.fetch_add(1, Ordering::Relaxed)); + let transaction = Transaction { + id: tx_id.clone(), + operations: Vec::new(), + state: TransactionState::Active, + }; + + self.transactions.push(transaction); + tx_id + } + + /// Добавляет операцию в транзакцию + pub fn add_operation(&self, transaction_id: &str, operation: TransactionOperation) -> Result<(), String> { + // В lock-free реализации операции сразу применяются к хранилищу + // или добавляются в очередь для пакетной обработки + Ok(()) + } + + /// Завершает транзакцию с коммитом + pub fn commit_transaction(&self, transaction_id: &str) -> Result, String> { + // В lock-free реализации коммит - это маркировка транзакции как завершенной + // Фактическое применение операций происходит асинхронно + let operations = Vec::new(); // В реальной реализации здесь были бы операции транзакции + Ok(operations) + } + + /// Откатывает транзакцию + pub fn rollback_transaction(&self, transaction_id: &str) -> Result<(), String> { + // В lock-free реализации откат - это удаление операций из очереди + // или маркировка их как отмененных + Ok(()) + } + + /// Получает ожидающие транзакции + pub fn get_pending_transactions(&self) -> Vec { + let mut transactions = Vec::new(); + while let Some(tx) = self.transactions.pop() { + transactions.push(tx); + } + transactions + } + } +} + +// Модуль экспорта/импорта CSV +mod csv_export { + use super::*; + use std::fs::File; + use csv::Writer; + + /// Экспортер данных в CSV формат + pub struct CsvExporter; + + impl CsvExporter { + /// Экспортирует данные в CSV файл + pub fn export_data(data: &HashMap>, file_path: &str) -> Result<(), String> { + let file = File::create(file_path) + .map_err(|e| format!("Failed to create file: {}", e))?; + + let mut wtr = Writer::from_writer(file); + + for (collection, documents) in data { + for (key, value) in documents { + if let Some(obj) = value.as_object() { + let mut record = vec![collection.clone(), key.clone()]; + + for (field, field_value) in obj { + record.push(field_value.to_string()); + } + + wtr.write_record(&record) + .map_err(|e| format!("Failed to write record: {}", e))?; + } + } + } + + wtr.flush() + .map_err(|e| format!("Failed to flush writer: {}", e))?; + + Ok(()) + } + + /// Импортирует данные из CSV файла + pub fn import_data(file_path: &str) -> Result>, String> { + let file = File::open(file_path) + .map_err(|e| format!("Failed to open file: {}", e))?; + + let mut rdr = csv::Reader::from_reader(file); + let mut result = HashMap::new(); + + for record in rdr.records() { + let record = record.map_err(|e| format!("Failed to read record: {}", e))?; + if record.len() >= 3 { + let collection = record[0].to_string(); + let key = record[1].to_string(); + + let mut document = HashMap::new(); + for (i, field) in record.iter().enumerate().skip(2) { + document.insert(format!("field_{}", i - 2), Value::String(field.to_string())); + } + + result.entry(collection) + .or_insert_with(HashMap::new) + .insert(key, Value::Object(serde_json::Map::from_iter(document))); + } + } + + Ok(result) + } + } +} + +// Модуль lock-free репликации +mod replication_lockfree { + use super::*; + use crossbeam::queue::SegQueue; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::thread; + use std::time::Duration; + use serde::Serialize; + + /// Lock-free мастер-мастер репликация + /// Обеспечивает синхронизацию данных между узлами + pub struct LockFreeReplication { + // Флаг включения репликации + enabled: Arc, + // Список узлов для репликации + nodes: Vec, + // Очередь операций для репликации + replication_queue: Arc>, + // Хэндл потока синхронизации + sync_handle: Option>, + // Флаг завершения работы + shutdown: Arc, + } + + /// Операция репликации + #[derive(Clone, Debug, Serialize)] + pub struct ReplicationOperation { + // Тип операции + pub operation: String, + // Коллекция + pub collection: String, + // Ключ документа + pub key: String, + // Значение документа + pub value: Value, + // Временная метка + pub timestamp: u64, + } + + impl LockFreeReplication { + /// Создает новую систему репликации + pub fn new(enabled: bool, nodes: Vec) -> Self { + let replication_queue = Arc::new(SegQueue::new()); + let shutdown = Arc::new(AtomicBool::new(false)); + let enabled_flag = Arc::new(AtomicBool::new(enabled)); + + let queue_clone = replication_queue.clone(); + let nodes_clone = nodes.clone(); + let shutdown_clone = shutdown.clone(); + let enabled_clone = enabled_flag.clone(); + + // Запускаем поток синхронизации + let sync_handle = thread::spawn(move || { + while !shutdown_clone.load(Ordering::Relaxed) { + if enabled_clone.load(Ordering::Relaxed) && !queue_clone.is_empty() { + let operations: Vec = + (0..queue_clone.len()).filter_map(|_| queue_clone.pop()).collect(); + + if !operations.is_empty() { + // Асинхронная отправка операций на другие узлы + let nodes = nodes_clone.clone(); + let ops = operations.clone(); + thread::spawn(move || { + Self::send_operations_to_nodes(&nodes, &ops); + }); + } + } + thread::sleep(Duration::from_millis(100)); + } + }); + + Self { + enabled: enabled_flag, + nodes, + replication_queue, + sync_handle: Some(sync_handle), + shutdown, + } + } + + /// Проверяет включена ли репликация + pub fn is_enabled(&self) -> bool { + self.enabled.load(Ordering::Relaxed) + } + + /// Включает репликацию + pub fn enable(&self) { + self.enabled.store(true, Ordering::Relaxed); + } + + /// Выключает репликацию + pub fn disable(&self) { + self.enabled.store(false, Ordering::Relaxed); + } + + /// Добавляет операцию в очередь репликации + pub fn add_operation(&self, operation: ReplicationOperation) { + if self.is_enabled() { + self.replication_queue.push(operation); + } + } + + /// Возвращает количество ожидающих операций репликации + pub fn get_pending_operations_count(&self) -> usize { + self.replication_queue.len() + } + + /// Отправляет операции на узлы репликации + fn send_operations_to_nodes(nodes: &[String], operations: &[ReplicationOperation]) { + // Упрощенная реализация отправки операций + // В реальной системе здесь был бы HTTP клиент + log::info!("Sending {} operations to {} nodes", operations.len(), nodes.len()); + } + } + + impl Drop for LockFreeReplication { + fn drop(&mut self) { + // При уничтожении сигнализируем потоку о завершении + self.shutdown.store(true, Ordering::Relaxed); + if let Some(handle) = self.sync_handle.take() { + let _ = handle.join(); + } + } + } +} + +// Модуль управления бэкапами +mod backup { + use super::*; + use std::fs::File; + use flate2::write::GzEncoder; + use flate2::Compression; + use std::io::Write; + + /// Менеджер бэкапов для создания и восстановления резервных копий + pub struct BackupManager; + + impl BackupManager { + /// Создает сжатый бэкап данных + pub fn create_backup(data: &HashMap>, backup_path: &str) -> Result<(), String> { + let file = File::create(backup_path) + .map_err(|e| format!("Failed to create backup file: {}", e))?; + + let mut encoder = GzEncoder::new(file, Compression::default()); + let json_data = serde_json::to_string(data) + .map_err(|e| format!("Failed to serialize backup data: {}", e))?; + + encoder.write_all(json_data.as_bytes()) + .map_err(|e| format!("Failed to write backup data: {}", e))?; + + encoder.finish() + .map_err(|e| format!("Failed to finish backup: {}", e))?; + + Ok(()) + } + + /// Восстанавливает данные из бэкапа + pub fn restore_backup(backup_path: &str) -> Result>, String> { + let file = File::open(backup_path) + .map_err(|e| format!("Failed to open backup file: {}", e))?; + + let decoder = flate2::read::GzDecoder::new(file); + let data: HashMap> = serde_json::from_reader(decoder) + .map_err(|e| format!("Failed to deserialize backup data: {}", e))?; + + Ok(data) + } + } +} + +// Модуль управления хранимыми процедурами +mod procedures { + use super::*; + use std::fs; + use std::path::Path; + + /// Менеджер хранимых процедур на Lua + /// Позволяет создавать и выполнять пользовательские скрипты + pub struct StoredProceduresManager { + // Реестр процедур (имя -> код Lua) + procedures: RwLock>, + // Путь к директории с процедурами + procedures_path: String, + } + + impl StoredProceduresManager { + /// Создает новый менеджер хранимых процедур + pub fn new(procedures_path: &str) -> Result { + let manager = Self { + procedures: RwLock::new(HashMap::new()), + procedures_path: procedures_path.to_string(), + }; + + // Загружаем существующие процедуры при инициализации + manager.load_procedures()?; + + Ok(manager) + } + + /// Создает новую хранимую процедуру + pub fn create_procedure(&self, name: &str, code: &str) -> Result<(), String> { + let mut procedures = self.procedures.write(); + + if procedures.contains_key(name) { + return Err(format!("Procedure '{}' already exists", name)); + } + + procedures.insert(name.to_string(), code.to_string()); + + // Сохраняем процедуру в файл + self.save_procedure(name, code)?; + + Ok(()) + } + + /// Удаляет хранимую процедуру + pub fn drop_procedure(&self, name: &str) -> Result<(), String> { + let mut procedures = self.procedures.write(); + + if !procedures.contains_key(name) { + return Err(format!("Procedure '{}' not found", name)); + } + + procedures.remove(name); + + // Удаляем файл процедуры + let file_path = format!("{}/{}.lua", self.procedures_path, name); + if let Err(e) = fs::remove_file(file_path) { + log::warn!("Failed to remove procedure file: {}", e); + } + + Ok(()) + } + + /// Получает код процедуры по имени + pub fn get_procedure(&self, name: &str) -> Option { + let procedures = self.procedures.read(); + procedures.get(name).cloned() + } + + /// Возвращает список всех процедур + pub fn list_procedures(&self) -> Vec { + let procedures = self.procedures.read(); + procedures.keys().cloned().collect() + } + + /// Загружает процедуры из файловой системы + fn load_procedures(&self) -> Result<(), String> { + let path = Path::new(&self.procedures_path); + + // Создаем директорию если не существует + if !path.exists() { + fs::create_dir_all(path) + .map_err(|e| format!("Failed to create procedures directory: {}", e))?; + } + + let entries = fs::read_dir(path) + .map_err(|e| format!("Failed to read procedures directory: {}", e))?; + + let mut procedures = self.procedures.write(); + + for entry in entries { + let entry = entry.map_err(|e| format!("Failed to read directory entry: {}", e))?; + let path = entry.path(); + + if path.is_file() && path.extension().map_or(false, |ext| ext == "lua") { + if let Some(file_name) = path.file_stem().and_then(|s| s.to_str()) { + let code = fs::read_to_string(&path) + .map_err(|e| format!("Failed to read procedure file: {}", e))?; + + procedures.insert(file_name.to_string(), code); + } + } + } + + Ok(()) + } + + /// Сохраняет процедуру в файл + fn save_procedure(&self, name: &str, code: &str) -> Result<(), String> { + let file_path = format!("{}/{}.lua", self.procedures_path, name); + fs::write(&file_path, code) + .map_err(|e| format!("Failed to save procedure: {}", e))?; + + Ok(()) + } + } +} + +// Модуль шифрования +mod encryption { + use md5::{Digest, Md5}; + + /// Менеджер шифрования для СУБД + /// Предоставляет функции хеширования для паролей и данных + pub struct EncryptionManager; + + impl EncryptionManager { + /// Создает MD5 хеш от текста + pub fn md5_hash(text: &str) -> String { + let mut hasher = Md5::new(); + hasher.update(text.as_bytes()); + let result = hasher.finalize(); + format!("{:x}", result) + } + + /// Проверяет соответствие текста MD5 хешу + pub fn verify_md5(text: &str, hash: &str) -> bool { + Self::md5_hash(text) == hash + } + } +} + +// Re-export всех модулей для внешнего использования +pub use self::storage::StorageEngine; +pub use self::wal_lockfree::LockFreeWAL; +pub use self::index::IndexManager; +pub use self::raft::RaftCluster; +pub use self::acl::AccessControl; +pub use self::transaction_lockfree::{LockFreeTransactionManager, TransactionOperation, TransactionState}; +pub use self::csv_export::CsvExporter; +pub use self::replication_lockfree::{LockFreeReplication, ReplicationOperation}; +pub use self::backup::BackupManager; +pub use self::procedures::StoredProceduresManager; +pub use self::encryption::EncryptionManager; + +/// Основной класс СУБД futriix +/// Объединяет все компоненты в единую систему управления данными +#[derive(Clone)] +pub struct FutriixDB { + // Движок хранения данных + storage: Arc, + // Write-Ahead Log для надежности + wal: Arc, + // Менеджер индексов для ускорения поиска + index_manager: Arc, + // Raft кластер для распределенной работы + raft_cluster: Arc, + // Система контроля доступа + acl: Arc, + // Менеджер транзакций + transaction_manager: Arc, + // Система репликации + replication: Arc, + // Менеджер хранимых процедур + procedures_manager: Arc, + // Конфигурация СУБД + config: Config, +} + +impl FutriixDB { + /// Создает новый экземпляр СУБД с указанной конфигурацией + pub async fn new(config: &Config) -> Result { + // Создаем директорию для данных если не существует + std::fs::create_dir_all(&config.db_path) + .map_err(|e| format!("Failed to create data directory: {}", e))?; + + // Инициализируем все компоненты СУБД + let storage = Arc::new(StorageEngine::new(&config.db_path)?); + let wal = Arc::new(LockFreeWAL::new("wal.log")?); + let index_manager = Arc::new(IndexManager::new()); + let raft_cluster = Arc::new(RaftCluster::new().await?); + let acl = Arc::new(AccessControl::new()); + let transaction_manager = Arc::new(LockFreeTransactionManager::new()); + + let replication = Arc::new(LockFreeReplication::new( + config.master_master_replication, + config.replication_nodes.clone(), + )); + + let procedures_manager = Arc::new(StoredProceduresManager::new(&config.stored_procedures_path)?); + + Ok(Self { + storage, + wal, + index_manager, + raft_cluster, + acl, + transaction_manager, + replication, + procedures_manager, + config: config.clone(), + }) + } + + // ===== МЕТОДЫ ДЛЯ РАБОТЫ С ХРАНИМЫМИ ПРОЦЕДУРАМИ ===== + + /// Создает новую хранимую процедуру + pub fn create_procedure(&self, name: &str, code: &str) -> Result<(), String> { + self.procedures_manager.create_procedure(name, code) + } + + /// Удаляет хранимую процедуру + pub fn drop_procedure(&self, name: &str) -> Result<(), String> { + self.procedures_manager.drop_procedure(name) + } + + /// Получает код процедуры по имени + pub fn get_procedure(&self, name: &str) -> Option { + self.procedures_manager.get_procedure(name) + } + + /// Возвращает список всех процедур + pub fn list_procedures(&self) -> Vec { + self.procedures_manager.list_procedures() + } + + // ===== МЕТОДЫ ДЛЯ РАБОТЫ С БЭКАПАМИ ===== + + /// Создает бэкап данных в указанный файл + pub fn create_backup(&self, backup_path: &str) -> Result<(), String> { + let data = self.storage.get_all_data(); + BackupManager::create_backup(&data, backup_path) + } + + /// Восстанавливает данные из бэкапа + pub fn restore_backup(&self, backup_path: &str) -> Result<(), String> { + let data = BackupManager::restore_backup(backup_path)?; + self.storage.restore_from_data(data) + } + + // ===== МЕТОДЫ ДЛЯ РАБОТЫ С РЕПЛИКАЦИЕЙ ===== + + /// Проверяет включена ли репликация + pub fn is_replication_enabled(&self) -> bool { + self.replication.is_enabled() + } + + /// Включает репликацию + pub fn enable_replication(&self) { + self.replication.enable(); + } + + /// Выключает репликацию + pub fn disable_replication(&self) { + self.replication.disable(); + } + + /// Синхронизирует данные с другими узлами + pub fn sync_replication(&self) -> Result<(), String> { + // В lock-free реализации синхронизация происходит автоматически + Ok(()) + } + + // ===== МЕТОДЫ ДЛЯ РАБОТЫ С ШИФРОВАНИЕМ ===== + + /// Создает MD5 хеш от текста + pub fn md5_hash(&self, text: &str) -> String { + EncryptionManager::md5_hash(text) + } + + /// Проверяет соответствие текста MD5 хешу + pub fn verify_md5(&self, text: &str, hash: &str) -> bool { + EncryptionManager::verify_md5(text, hash) + } + + /// Вставляет документ с автоматическим хешированием пароля + pub fn insert_with_password_hash(&self, space: &str, key: &str, value: Value, password_field: &str) -> Result<(), String> { + let mut value = value; + + if let Some(obj) = value.as_object_mut() { + if let Some(password_value) = obj.get_mut(password_field) { + if let Some(password) = password_value.as_str() { + let hashed_password = self.md5_hash(password); + *password_value = Value::String(hashed_password); + } + } + } + + self.insert(space, key, value) + } + + /// Проверяет пароль пользователя + pub fn verify_password(&self, space: &str, key: &str, password: &str, password_field: &str) -> Result { + match self.get(space, key) { + Ok(Some(value)) => { + if let Some(obj) = value.as_object() { + if let Some(stored_hash) = obj.get(password_field).and_then(|v| v.as_str()) { + Ok(self.verify_md5(password, stored_hash)) + } else { + Ok(false) + } + } else { + Ok(false) + } + } + Ok(None) => Err("Document not found".to_string()), + Err(e) => Err(e), + } + } + + // ===== ОСНОВНЫЕ МЕТОДЫ СУБД (CRUD ОПЕРАЦИИ) ===== + + /// Создает новое пространство данных + pub fn create_space(&self, name: &str) -> Result<(), String> { + self.storage.create_space(name) + } + + /// Удаляет пространство данных + pub fn delete_space(&self, name: &str) -> Result<(), String> { + self.storage.delete_space(name) + } + + /// Вставляет документ в пространство + pub fn insert(&self, space: &str, key: &str, value: Value) -> Result<(), String> { + self.storage.insert(space, key, value) + } + + /// Получает документ из пространства + pub fn get(&self, space: &str, key: &str) -> Result, String> { + self.storage.get(space, key) + } + + /// Обновляет документ в пространстве + pub fn update(&self, space: &str, key: &str, value: Value) -> Result<(), String> { + self.storage.update(space, key, value) + } + + /// Удаляет документ из пространства + pub fn delete(&self, space: &str, key: &str) -> Result<(), String> { + self.storage.delete(space, key) + } + + /// Создает кортеж в пространстве + pub fn create_tuple(&self, space: &str, tuple_id: &str, value: Value) -> Result<(), String> { + self.storage.create_tuple(space, tuple_id, value) + } + + /// Читает кортеж из пространства + pub fn read_tuple(&self, space: &str, tuple_id: &str) -> Result, String> { + self.storage.read_tuple(space, tuple_id) + } + + /// Удаляет кортеж из пространства + pub fn delete_tuple(&self, space: &str, tuple_id: &str) -> Result<(), String> { + self.storage.delete_tuple(space, tuple_id) + } + + // ===== МЕТОДЫ ДЛЯ РАБОТЫ С ТРАНЗАКЦИЯМИ ===== + + /// Начинает новую транзакцию + pub fn begin_transaction(&self, transaction_id: String) -> Result<(), String> { + self.transaction_manager.begin_transaction(); + Ok(()) + } + + /// Добавляет операцию в транзакцию + pub fn add_operation(&self, transaction_id: &str, operation: TransactionOperation) -> Result<(), String> { + self.transaction_manager.add_operation(transaction_id, operation) + } + + /// Завершает транзакцию с коммитом + pub fn commit_transaction(&self, transaction_id: &str) -> Result, String> { + self.transaction_manager.commit_transaction(transaction_id) + } + + /// Откатывает транзакцию + pub fn rollback_transaction(&self, transaction_id: &str) -> Result<(), String> { + self.transaction_manager.rollback_transaction(transaction_id) + } + + // ===== МЕТОДЫ ДЛЯ РАБОТЫ С ИНДЕКСАМИ ===== + + /// Создает первичный индекс для коллекции + pub fn create_primary_index(&self, collection: &str) -> Result<(), String> { + self.index_manager.create_primary_index(collection) + } + + /// Создает вторичный индекс для поля коллекции + pub fn create_secondary_index(&self, collection: &str, field: &str, index_type: &str) -> Result<(), String> { + self.index_manager.create_secondary_index(collection, field, index_type) + } + + /// Удаляет первичный индекс коллекции + pub fn drop_primary_index(&self, collection: &str) -> Result<(), String> { + self.index_manager.drop_primary_index(collection) + } + + /// Удаляет вторичный индекс поля коллекции + pub fn drop_secondary_index(&self, collection: &str, field: &str) -> Result<(), String> { + self.index_manager.drop_secondary_index(collection, field) + } + + /// Поиск по вторичному индексу + pub fn search_index(&self, collection: &str, field: &str, value: &str) -> Result, String> { + self.index_manager.search_index(collection, field, value) + } + + // ===== МЕТОДЫ ДЛЯ РАБОТЫ С ACL ===== + + /// Добавляет пользователя с правами доступа + pub fn add_user(&self, username: &str, permissions: acl::UserPermissions) { + self.acl.add_user(username, permissions); + } + + /// Проверяет право на чтение + pub fn can_read(&self, username: &str, collection: &str) -> bool { + self.acl.can_read(username, collection) + } + + /// Проверяет право на запись + pub fn can_write(&self, username: &str, collection: &str) -> bool { + self.acl.can_write(username, collection) + } + + /// Проверяет право на создание + pub fn can_create(&self, username: &str, collection: &str) -> bool { + self.acl.can_create(username, collection) + } + + /// Проверяет право на удаление + pub fn can_delete(&self, username: &str, collection: &str) -> bool { + self.acl.can_delete(username, collection) + } + + // ===== МЕТОДЫ ДЛЯ РАБОТЫ С КЛАСТЕРОМ ===== + + /// Добавляет узел в кластер + pub async fn add_cluster_node(&self, node_url: String) -> Result<(), String> { + self.raft_cluster.add_node(node_url).await + } + + /// Удаляет узел из кластера + pub async fn remove_cluster_node(&self, node_id: &str) -> Result<(), String> { + // Для удаления узла нам нужно &mut self + // В реальной реализации это потребовало бы изменения архитектуры + Err("Node removal not implemented in this version".to_string()) + } + + /// Возвращает список узлов кластера + pub fn list_cluster_nodes(&self) -> Vec<(String, String)> { + self.raft_cluster.list_nodes() + } + + /// Возвращает статус кластера + pub fn get_cluster_status(&self) -> String { + self.raft_cluster.get_cluster_status() + } + + /// Возвращает роль текущего узла + pub fn get_current_node_role(&self) -> raft::NodeRole { + self.raft_cluster.get_current_node_role() + } + + /// Возвращает ID текущего узла + pub fn get_current_node_id(&self) -> String { + self.raft_cluster.get_current_node_id() + } + + // ===== МЕТОДЫ ДЛЯ ЭКСПОРТА/ИМПОРТА ===== + + /// Экспортирует данные в CSV файл + pub fn export_to_csv(&self, file_path: &str) -> Result<(), String> { + let data = self.storage.get_all_data(); + CsvExporter::export_data(&data, file_path) + } + + /// Импортирует данные из CSV файла + pub fn import_from_csv(&self, file_path: &str) -> Result<(), String> { + let data = CsvExporter::import_data(file_path)?; + self.storage.restore_from_data(data) + } +}