From 00f8205f942cb9c7dd5a34c91e9d7c8aa5feaeae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Fri, 28 Nov 2025 20:23:52 +0000 Subject: [PATCH] Delete src/server/database.rs --- src/server/database.rs | 718 ----------------------------------------- 1 file changed, 718 deletions(-) delete mode 100755 src/server/database.rs diff --git a/src/server/database.rs b/src/server/database.rs deleted file mode 100755 index 4461a61..0000000 --- a/src/server/database.rs +++ /dev/null @@ -1,718 +0,0 @@ -// src/server/database.rs -//! Wait-Free документо-ориентированная база данных Futriix -//! -//! Реализует wait-free доступ к данным с использованием атомарных -//! ссылок и lock-free структур данных для максимальной производительности. -//! Автоматически добавляет временные метки ко всем операциям с документами. - -#![allow(unused_imports)] -#![allow(dead_code)] - -use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::RwLock; -use serde::{Serialize, Deserialize}; -use serde_json::Value; -use uuid::Uuid; -use dashmap::DashMap; - -use crate::common::Result; -use crate::common::protocol; - -/// Триггеры для коллекций -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub enum TriggerEvent { - BeforeCreate, - AfterCreate, - BeforeUpdate, - AfterUpdate, - BeforeDelete, - AfterDelete, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Trigger { - pub name: String, - pub event: TriggerEvent, - pub collection: String, - pub lua_code: String, -} - -/// Типы индексов -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum IndexType { - Primary, - Secondary, -} - -/// Структура индекса -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Index { - pub name: String, - pub index_type: IndexType, - pub field: String, - pub unique: bool, -} - -/// Wait-Free коллекция документов -#[derive(Clone)] -pub struct Collection { - name: String, - documents: Arc>>>, - sequence: Arc, - triggers: Arc>>, - indexes: Arc>>, - index_data: Arc>>>, -} - -impl Collection { - /// Создание новой wait-free коллекции - pub fn new(name: String) -> Self { - Self { - name, - documents: Arc::new(RwLock::new(std::collections::HashMap::new())), - sequence: Arc::new(AtomicU64::new(0)), - triggers: Arc::new(RwLock::new(Vec::new())), - indexes: Arc::new(RwLock::new(std::collections::HashMap::new())), - index_data: Arc::new(DashMap::new()), - } - } - - /// Функция для логирования операций с временной меткой - fn log_operation(&self, operation: &str, id: &str) { - use std::fs::OpenOptions; - use std::io::Write; - - let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(); - let log_message = format!("[{}] Collection: '{}', Operation: '{}', Document ID: '{}'\n", - timestamp, self.name, operation, id); - - // Логируем в файл - if let Ok(mut file) = OpenOptions::new() - .create(true) - .append(true) - .open("futriix.log") - { - let _ = file.write_all(log_message.as_bytes()); - } - - // Также выводим в консоль для отладки - println!("{}", log_message.trim()); - } - - /// Добавление временной метки к документу - fn add_timestamp_to_document(&self, document: Vec, operation: &str) -> Result> { - let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(); - - // Парсим документ как JSON - let mut doc_value: Value = serde_json::from_slice(&document) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; - - // Добавляем временные метки - if let Value::Object(ref mut obj) = doc_value { - obj.insert("_timestamp".to_string(), Value::String(timestamp.clone())); - obj.insert("_operation".to_string(), Value::String(operation.to_string())); - } - - // Сериализуем обратно в байты - serde_json::to_vec(&doc_value) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string())) - } - - /// Добавление триггера - pub fn add_trigger(&self, trigger: Trigger) -> Result<()> { - let mut triggers = self.triggers.write() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - triggers.push(trigger); - Ok(()) - } - - /// Получение триггеров для события - #[allow(dead_code)] - pub fn get_triggers_for_event(&self, event: TriggerEvent) -> Result> { - let triggers = self.triggers.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - Ok(triggers.iter() - .filter(|t| t.event == event) - .cloned() - .collect()) - } - - /// Создание индекса - pub fn create_index(&self, index: Index) -> Result<()> { - let mut indexes = self.indexes.write() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - if indexes.contains_key(&index.name) { - return Err(crate::common::FutriixError::DatabaseError( - format!("Index already exists: {}", index.name) - )); - } - - // Создаем структуру для хранения данных индекса - self.index_data.insert(index.name.clone(), std::collections::HashMap::new()); - - let index_clone = index.clone(); - indexes.insert(index.name.clone(), index); - - // Перестраиваем индекс для существующих документов - self.rebuild_index(&index_clone.name)?; - - Ok(()) - } - - /// Перестроение индекса - fn rebuild_index(&self, index_name: &str) -> Result<()> { - let indexes = self.indexes.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - let index = indexes.get(index_name) - .ok_or_else(|| crate::common::FutriixError::DatabaseError( - format!("Index not found: {}", index_name) - ))?; - - let documents = self.documents.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - let mut index_map = std::collections::HashMap::new(); - - for (id, document_bytes) in documents.iter() { - if let Ok(document) = serde_json::from_slice::(document_bytes) { - if let Some(field_value) = document.get(&index.field) { - // Конвертируем значение в строку для использования в HashMap - let value_str = field_value.to_string(); - let entry = index_map.entry(value_str).or_insert_with(Vec::new); - entry.push(id.clone()); - - // Проверка уникальности для уникальных индексов - if index.unique && entry.len() > 1 { - return Err(crate::common::FutriixError::DatabaseError( - format!("Duplicate value {} for unique index {}", field_value, index_name) - )); - } - } - } - } - - self.index_data.insert(index_name.to_string(), index_map); - Ok(()) - } - - /// Обновление индекса при изменении документа - fn update_indexes(&self, old_document: Option<&[u8]>, new_document: &[u8], document_id: &str) -> Result<()> { - let indexes = self.indexes.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - let new_doc_value: Value = serde_json::from_slice(new_document) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; - - let old_doc_value: Option = old_document - .and_then(|doc| serde_json::from_slice(doc).ok()); - - for (index_name, index) in indexes.iter() { - if let Some(mut index_map) = self.index_data.get_mut(index_name) { - // Удаляем старые значения из индекса - if let Some(old_doc) = &old_doc_value { - if let Some(old_value) = old_doc.get(&index.field) { - let old_value_str = old_value.to_string(); - if let Some(entries) = index_map.get_mut(&old_value_str) { - entries.retain(|id| id != document_id); - if entries.is_empty() { - index_map.remove(&old_value_str); - } - } - } - } - - // Добавляем новые значения в индекс - if let Some(new_value) = new_doc_value.get(&index.field) { - let new_value_str = new_value.to_string(); - let entries = index_map.entry(new_value_str).or_insert_with(Vec::new); - - // Проверка уникальности - if index.unique && !entries.is_empty() && entries[0] != document_id { - return Err(crate::common::FutriixError::DatabaseError( - format!("Duplicate value {} for unique index {}", new_value, index_name) - )); - } - - if !entries.contains(&document_id.to_string()) { - entries.push(document_id.to_string()); - } - } - } - } - - Ok(()) - } - - /// Поиск по индексу - pub fn query_by_index(&self, index_name: &str, value: &Value) -> Result> { - let index_map = self.index_data.get(index_name) - .ok_or_else(|| crate::common::FutriixError::DatabaseError( - format!("Index not found: {}", index_name) - ))?; - - let value_str = value.to_string(); - Ok(index_map.get(&value_str).cloned().unwrap_or_default()) - } - - /// Wait-Free создание документа с временной меткой - pub fn create_document(&self, document: Vec) -> Result { - let id = Uuid::new_v4().to_string(); - let seq = self.sequence.fetch_add(1, Ordering::SeqCst); - - // Добавляем временную метку к документу - let document_with_timestamp = self.add_timestamp_to_document(document, "create")?; - - let mut documents = self.documents.write() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - // Проверяем уникальность перед вставкой - self.update_indexes(None, &document_with_timestamp, &id)?; - - documents.insert(id.clone(), document_with_timestamp); - - // Логируем операцию - self.log_operation("create", &id); - - println!("Document created in collection '{}' with ID: {} (seq: {})", self.name, id, seq); - Ok(id) - } - - /// Wait-Free чтение документа - pub fn read_document(&self, id: &str) -> Result>> { - let documents = self.documents.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - // Логируем операцию чтения - self.log_operation("read", id); - - Ok(documents.get(id).cloned()) - } - - /// Wait-Free обновление документа с временной меткой - pub fn update_document(&self, id: &str, document: Vec) -> Result<()> { - let seq = self.sequence.fetch_add(1, Ordering::SeqCst); - - // Добавляем временную метку к документу - let document_with_timestamp = self.add_timestamp_to_document(document, "update")?; - - let mut documents = self.documents.write() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - if let Some(old_document) = documents.get(id) { - // Обновляем индексы - self.update_indexes(Some(old_document), &document_with_timestamp, id)?; - - documents.insert(id.to_string(), document_with_timestamp); - - // Логируем операцию - self.log_operation("update", id); - - println!("Document updated in collection '{}': {} (seq: {})", self.name, id, seq); - Ok(()) - } else { - Err(crate::common::FutriixError::DatabaseError( - format!("Document not found: {}", id) - )) - } - } - - /// Wait-Free удаление документа с временной меткой - pub fn delete_document(&self, id: &str) -> Result<()> { - let seq = self.sequence.fetch_add(1, Ordering::SeqCst); - - let mut documents = self.documents.write() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - if let Some(old_document) = documents.get(id) { - // Удаляем из индексов - self.update_indexes(Some(old_document), &[], id)?; - - documents.remove(id); - - // Логируем операцию - self.log_operation("delete", id); - - println!("Document deleted from collection '{}': {} (seq: {})", self.name, id, seq); - Ok(()) - } else { - Err(crate::common::FutriixError::DatabaseError( - format!("Document not found: {}", id) - )) - } - } - - /// Wait-Free запрос документов - pub fn query_documents(&self, _filter: Vec) -> Result>> { - let documents = self.documents.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - // Логируем операцию запроса - self.log_operation("query", "multiple"); - - // TODO: Реализовать wait-free фильтрацию на основе filter - let documents: Vec> = documents.values().cloned().collect(); - - Ok(documents) - } - - /// Получение имени коллекции (wait-free) - #[allow(dead_code)] - pub fn get_name(&self) -> &str { - &self.name - } - - /// Получение количества документов (wait-free) - #[allow(dead_code)] - pub fn count_documents(&self) -> Result { - let documents = self.documents.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - Ok(documents.len()) - } -} - -/// Wait-Free база данных -#[derive(Clone)] -pub struct Database { - collections: Arc>, - procedures: Arc>>, - transactions: Arc>>, -} - -impl Database { - /// Создание новой wait-free базы данных - pub fn new() -> Self { - Self { - collections: Arc::new(DashMap::new()), - procedures: Arc::new(DashMap::new()), - transactions: Arc::new(DashMap::new()), - } - } - - /// Wait-Free получение или создание коллекции - pub fn get_collection(&self, name: &str) -> Collection { - if let Some(collection) = self.collections.get(name) { - return collection.clone(); - } - - // Создаем новую коллекцию wait-free способом - let new_collection = Collection::new(name.to_string()); - self.collections.insert(name.to_string(), new_collection.clone()); - - new_collection - } - - /// Wait-Free выполнение команды - pub fn execute_command(&self, command: protocol::Command) -> Result { - match command { - protocol::Command::Create { collection, document } => { - let coll = self.get_collection(&collection); - match coll.create_document(document) { - Ok(id) => Ok(protocol::Response::Success(id.into_bytes())), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::Read { collection, id } => { - let coll = self.get_collection(&collection); - match coll.read_document(&id) { - Ok(Some(document)) => Ok(protocol::Response::Success(document)), - Ok(None) => Ok(protocol::Response::Error(format!("Document not found: {}", id))), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::Update { collection, id, document } => { - let coll = self.get_collection(&collection); - match coll.update_document(&id, document) { - Ok(_) => Ok(protocol::Response::Success(vec![])), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::Delete { collection, id } => { - let coll = self.get_collection(&collection); - match coll.delete_document(&id) { - Ok(_) => Ok(protocol::Response::Success(vec![])), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::Query { collection, filter } => { - let coll = self.get_collection(&collection); - match coll.query_documents(filter) { - Ok(documents) => { - let json_docs: Vec = documents.into_iter() - .filter_map(|doc| serde_json::from_slice(&doc).ok()) - .collect(); - - match serde_json::to_vec(&json_docs) { - Ok(data) => Ok(protocol::Response::Success(data)), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::CreateProcedure { name, code } => { - self.procedures.insert(name, code); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::CallProcedure { name } => { - if self.procedures.contains_key(&name) { - // TODO: Выполнить Lua код процедура - Ok(protocol::Response::Success(format!("Procedure {} executed", name).into_bytes())) - } else { - Ok(protocol::Response::Error(format!("Procedure not found: {}", name))) - } - } - protocol::Command::BeginTransaction { transaction_id } => { - if self.transactions.contains_key(&transaction_id) { - return Ok(protocol::Response::Error("Transaction already exists".to_string())); - } - - self.transactions.insert(transaction_id, Vec::new()); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::CommitTransaction { transaction_id } => { - if let Some((_, commands)) = self.transactions.remove(&transaction_id) { - // Выполняем все команды транзакции wait-free способом - for cmd in commands { - if let Err(e) = self.execute_command(cmd) { - return Ok(protocol::Response::Error(format!("Transaction failed: {}", e))); - } - } - - Ok(protocol::Response::Success(vec![])) - } else { - Ok(protocol::Response::Error("Transaction not found".to_string())) - } - } - protocol::Command::RollbackTransaction { transaction_id } => { - if self.transactions.remove(&transaction_id).is_some() { - Ok(protocol::Response::Success(vec![])) - } else { - Ok(protocol::Response::Error("Transaction not found".to_string())) - } - } - protocol::Command::CreateIndex { collection, index } => { - let coll = self.get_collection(&collection); - match coll.create_index(index) { - Ok(_) => Ok(protocol::Response::Success(vec![])), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::QueryByIndex { collection, index_name, value } => { - let coll = self.get_collection(&collection); - let value: Value = serde_json::from_slice(&value) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; - - match coll.query_by_index(&index_name, &value) { - Ok(document_ids) => { - let result = serde_json::to_vec(&document_ids) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; - Ok(protocol::Response::Success(result)) - } - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - // Обработка новых команд для шардинга, constraints, компрессии и глобальных индексов - protocol::Command::AddShardNode { node_id, address, capacity } => { - // TODO: Реализовать добавление шард-узла - println!("Adding shard node: {} at {} with capacity {}", node_id, address, capacity); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::RemoveShardNode { node_id } => { - // TODO: Реализовать удаление шард-узла - println!("Removing shard node: {}", node_id); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::MigrateShard { collection, from_node, to_node, shard_key } => { - // TODO: Реализовать миграцию шарда - println!("Migrating shard from {} to {} for collection {} with key {}", from_node, to_node, collection, shard_key); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::RebalanceCluster => { - // TODO: Реализовать ребалансировку кластера - println!("Rebalancing cluster"); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::GetClusterStatus => { - // TODO: Реализовать получение статуса кластера - let status = protocol::ClusterStatus { - nodes: vec![], - total_capacity: 0, - total_used: 0, - rebalance_needed: false, - cluster_formed: false, - leader_exists: false, - raft_nodes: vec![], - }; - match protocol::serialize(&status) { - Ok(data) => Ok(protocol::Response::Success(data)), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::StartElection => { - // TODO: Реализовать Raft выборы - println!("Starting Raft election"); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::GetRaftNodes => { - // TODO: Реализовать получение Raft узлов - println!("Getting Raft nodes"); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::AddConstraint { collection, constraint_name, constraint_type, field, value } => { - // TODO: Реализовать добавление constraint - println!("Adding constraint {} to collection {}: {} {} with value {:?}", constraint_name, collection, constraint_type, field, value); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::RemoveConstraint { collection, constraint_name } => { - // TODO: Реализовать удаление constraint - println!("Removing constraint {} from collection {}", constraint_name, collection); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::EnableCompression { collection, algorithm } => { - // TODO: Реализовать включение компрессии - println!("Enabling {} compression for collection {}", algorithm, collection); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::DisableCompression { collection } => { - // TODO: Реализовать отключение компрессии - println!("Disabling compression for collection {}", collection); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::CreateGlobalIndex { name, field, unique } => { - // TODO: Реализовать создание глобального индекса - println!("Creating global index {} on field {} (unique: {})", name, field, unique); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::QueryGlobalIndex { index_name, value } => { - // TODO: Реализовать запрос по глобальному индексу - println!("Querying global index {} with value {:?}", index_name, value); - Ok(protocol::Response::Success(vec![])) - } - // Новые команды для CSV - protocol::Command::ImportCsv { collection, file_path } => { - // TODO: Интегрировать с CsvManager - println!("Importing CSV to collection '{}' from '{}'", collection, file_path); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::ExportCsv { collection, file_path } => { - // TODO: Интегрировать с CsvManager - println!("Exporting collection '{}' to CSV file '{}'", collection, file_path); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::ListCsvFiles => { - // TODO: Интегрировать с CsvManager - println!("Listing CSV files"); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::GetImportProgress { collection } => { - // TODO: Интегрировать с CsvManager - println!("Getting import progress for '{}'", collection); - Ok(protocol::Response::Success(vec![])) - } - } - } - - /// Wait-Free получение статистики базы данных - #[allow(dead_code)] - pub fn get_stats(&self) -> Result> { - let mut stats = std::collections::HashMap::new(); - stats.insert("collections".to_string(), self.collections.len()); - stats.insert("procedures".to_string(), self.procedures.len()); - stats.insert("active_transactions".to_string(), self.transactions.len()); - - // Подсчет документов во всех коллекциях - let total_documents: usize = self.collections.iter() - .map(|entry| entry.value().count_documents().unwrap_or(0)) - .sum(); - - stats.insert("total_documents".to_string(), total_documents); - - Ok(stats) - } - - /// Создание бэкапа базы данных - pub fn create_backup(&self) -> Result>>> { - let mut backup = std::collections::HashMap::new(); - - for entry in self.collections.iter() { - let name = entry.key().clone(); - let collection = entry.value(); - - let documents = collection.documents.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - let mut collection_backup = std::collections::HashMap::new(); - for (id, document) in documents.iter() { - collection_backup.insert(id.clone(), document.clone()); - } - - backup.insert(name, collection_backup); - } - - Ok(backup) - } - - /// Восстановление из бэкапа - pub fn restore_from_backup(&self, backup: std::collections::HashMap>>) -> Result<()> { - // Очищаем существующие коллекции - self.collections.clear(); - - // Восстанавливаем данные из бэкапа - for (collection_name, documents) in backup { - let collection = Collection::new(collection_name.clone()); - - { - let mut collection_docs = collection.documents.write() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - for (id, document) in documents { - collection_docs.insert(id, document); - } - } - - self.collections.insert(collection_name, collection); - } - - Ok(()) - } - - /// Добавление триггера к коллекции - pub fn add_trigger(&self, trigger: Trigger) -> Result<()> { - let collection = self.get_collection(&trigger.collection); - collection.add_trigger(trigger) - } -}