diff --git a/src/server/database.rs b/src/server/database.rs deleted file mode 100644 index 0965f69..0000000 --- a/src/server/database.rs +++ /dev/null @@ -1,640 +0,0 @@ -// src/server/database.rs -//! Wait-Free документо-ориентированная база данных Futriix -//! -//! Реализует wait-free доступ к данным с использованием атомарных -//! ссылок и lock-free структур данных для максимальной производительности. -//! Автоматически добавляет временные метки ко всем операциям с документами. - -#![allow(unused_imports)] -#![allow(dead_code)] - -use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::RwLock; -use serde::{Serialize, Deserialize}; -use serde_json::Value; -use uuid::Uuid; -use dashmap::DashMap; - -use crate::common::Result; -use crate::common::protocol; - -/// Триггеры для коллекций -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub enum TriggerEvent { - BeforeCreate, - AfterCreate, - BeforeUpdate, - AfterUpdate, - BeforeDelete, - AfterDelete, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Trigger { - pub name: String, - pub event: TriggerEvent, - pub collection: String, - pub lua_code: String, -} - -/// Типы индексов -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum IndexType { - Primary, - Secondary, -} - -/// Структура индекса -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Index { - pub name: String, - pub index_type: IndexType, - pub field: String, - pub unique: bool, -} - -/// Wait-Free коллекция документов -#[derive(Clone)] -pub struct Collection { - name: String, - documents: Arc>>>, - sequence: Arc, - triggers: Arc>>, - indexes: Arc>>, - index_data: Arc>>>, -} - -impl Collection { - /// Создание новой wait-free коллекции - pub fn new(name: String) -> Self { - Self { - name, - documents: Arc::new(RwLock::new(std::collections::HashMap::new())), - sequence: Arc::new(AtomicU64::new(0)), - triggers: Arc::new(RwLock::new(Vec::new())), - indexes: Arc::new(RwLock::new(std::collections::HashMap::new())), - index_data: Arc::new(DashMap::new()), - } - } - - /// Функция для логирования операций с временной меткой - fn log_operation(&self, operation: &str, id: &str) { - use std::fs::OpenOptions; - use std::io::Write; - - // ИСПРАВЛЕНИЕ: Используем системное время с миллисекундами - let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(); - let log_message = format!("[{}] Collection: '{}', Operation: '{}', Document ID: '{}'\n", - timestamp, self.name, operation, id); - - // Логируем в файл - if let Ok(mut file) = OpenOptions::new() - .create(true) - .append(true) - .open("futriix.log") - { - let _ = file.write_all(log_message.as_bytes()); - } - - // Также выводим в консоль для отладки - println!("{}", log_message.trim()); - } - - /// Добавление временной метки к документу - fn add_timestamp_to_document(&self, document: Vec, operation: &str) -> Result> { - // ИСПРАВЛЕНИЕ: Используем системное время с миллисекундами - let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(); - - // Парсим документ как JSON - let mut doc_value: Value = serde_json::from_slice(&document) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; - - // Добавляем временные метки - if let Value::Object(ref mut obj) = doc_value { - obj.insert("_timestamp".to_string(), Value::String(timestamp.clone())); - obj.insert("_operation".to_string(), Value::String(operation.to_string())); - } - - // Сериализуем обратно в байты - serde_json::to_vec(&doc_value) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string())) - } - - /// Добавление триггера - pub fn add_trigger(&self, trigger: Trigger) -> Result<()> { - let mut triggers = self.triggers.write() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - triggers.push(trigger); - Ok(()) - } - - /// Получение триггеров для события - #[allow(dead_code)] - pub fn get_triggers_for_event(&self, event: TriggerEvent) -> Result> { - let triggers = self.triggers.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - Ok(triggers.iter() - .filter(|t| t.event == event) - .cloned() - .collect()) - } - - /// Создание индекса - pub fn create_index(&self, index: Index) -> Result<()> { - let mut indexes = self.indexes.write() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - if indexes.contains_key(&index.name) { - return Err(crate::common::FutriixError::DatabaseError( - format!("Index already exists: {}", index.name) - )); - } - - // Создаем структуру для хранения данных индекса - self.index_data.insert(index.name.clone(), std::collections::HashMap::new()); - - let index_clone = index.clone(); - indexes.insert(index.name.clone(), index); - - // Перестраиваем индекс для существующих документов - self.rebuild_index(&index_clone.name)?; - - Ok(()) - } - - /// Перестроение индекса - fn rebuild_index(&self, index_name: &str) -> Result<()> { - let indexes = self.indexes.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - let index = indexes.get(index_name) - .ok_or_else(|| crate::common::FutriixError::DatabaseError( - format!("Index not found: {}", index_name) - ))?; - - let documents = self.documents.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - let mut index_map = std::collections::HashMap::new(); - - for (id, document_bytes) in documents.iter() { - if let Ok(document) = serde_json::from_slice::(document_bytes) { - if let Some(field_value) = document.get(&index.field) { - // Конвертируем значение в строку для использования в HashMap - let value_str = field_value.to_string(); - let entry = index_map.entry(value_str).or_insert_with(Vec::new); - entry.push(id.clone()); - - // Проверка уникальности для уникальных индексов - if index.unique && entry.len() > 1 { - return Err(crate::common::FutriixError::DatabaseError( - format!("Duplicate value {} for unique index {}", field_value, index_name) - )); - } - } - } - } - - self.index_data.insert(index_name.to_string(), index_map); - Ok(()) - } - - /// Обновление индекса при изменении документа - fn update_indexes(&self, old_document: Option<&[u8]>, new_document: &[u8], document_id: &str) -> Result<()> { - let indexes = self.indexes.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - let new_doc_value: Value = serde_json::from_slice(new_document) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; - - let old_doc_value: Option = old_document - .and_then(|doc| serde_json::from_slice(doc).ok()); - - for (index_name, index) in indexes.iter() { - if let Some(mut index_map) = self.index_data.get_mut(index_name) { - // Удаляем старые значения из индекса - if let Some(old_doc) = &old_doc_value { - if let Some(old_value) = old_doc.get(&index.field) { - let old_value_str = old_value.to_string(); - if let Some(entries) = index_map.get_mut(&old_value_str) { - entries.retain(|id| id != document_id); - if entries.is_empty() { - index_map.remove(&old_value_str); - } - } - } - } - - // Добавляем новые значения в индекс - if let Some(new_value) = new_doc_value.get(&index.field) { - let new_value_str = new_value.to_string(); - let entries = index_map.entry(new_value_str).or_insert_with(Vec::new); - - // Проверка уникальности - if index.unique && !entries.is_empty() && entries[0] != document_id { - return Err(crate::common::FutriixError::DatabaseError( - format!("Duplicate value {} for unique index {}", new_value, index_name) - )); - } - - if !entries.contains(&document_id.to_string()) { - entries.push(document_id.to_string()); - } - } - } - } - - Ok(()) - } - - /// Поиск по индексу - pub fn query_by_index(&self, index_name: &str, value: &Value) -> Result> { - let index_map = self.index_data.get(index_name) - .ok_or_else(|| crate::common::FutriixError::DatabaseError( - format!("Index not found: {}", index_name) - ))?; - - let value_str = value.to_string(); - Ok(index_map.get(&value_str).cloned().unwrap_or_default()) - } - - /// Wait-Free создание документа с временной меткой - pub fn create_document(&self, document: Vec) -> Result { - let id = Uuid::new_v4().to_string(); - let seq = self.sequence.fetch_add(1, Ordering::SeqCst); - - // Добавляем временную метку к документу - let document_with_timestamp = self.add_timestamp_to_document(document, "create")?; - - // ИСПРАВЛЕНИЕ: Сначала проверяем индексы, потом вставляем документ - // Это предотвращает ситуацию, когда документ вставлен, но индексы не обновлены - self.update_indexes(None, &document_with_timestamp, &id)?; - - let mut documents = self.documents.write() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - // Проверяем, не существует ли уже документ с таким ID - if documents.contains_key(&id) { - return Err(crate::common::FutriixError::DatabaseError( - format!("Document with ID {} already exists", id) - )); - } - - documents.insert(id.clone(), document_with_timestamp); - - // Логируем операцию - self.log_operation("create", &id); - - println!("Document created in collection '{}' with ID: {} (seq: {})", self.name, id, seq); - Ok(id) - } - - /// Wait-Free чтение документа - pub fn read_document(&self, id: &str) -> Result>> { - let documents = self.documents.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - // Логируем операцию чтения - self.log_operation("read", id); - - Ok(documents.get(id).cloned()) - } - - /// Wait-Free обновление документа с временной меткой - pub fn update_document(&self, id: &str, document: Vec) -> Result<()> { - let seq = self.sequence.fetch_add(1, Ordering::SeqCst); - - // Добавляем временную метку к документу - let document_with_timestamp = self.add_timestamp_to_document(document, "update")?; - - let mut documents = self.documents.write() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - if let Some(old_document) = documents.get(id) { - // Обновляем индексы - self.update_indexes(Some(old_document), &document_with_timestamp, id)?; - - documents.insert(id.to_string(), document_with_timestamp); - - // Логируем операцию - self.log_operation("update", id); - - println!("Document updated in collection '{}': {} (seq: {})", self.name, id, seq); - Ok(()) - } else { - Err(crate::common::FutriixError::DatabaseError( - format!("Document not found: {}", id) - )) - } - } - - /// Wait-Free удаление документа с временной меткой - pub fn delete_document(&self, id: &str) -> Result<()> { - let seq = self.sequence.fetch_add(1, Ordering::SeqCst); - - let mut documents = self.documents.write() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - if let Some(old_document) = documents.get(id) { - // Удаляем из индексов - self.update_indexes(Some(old_document), &[], id)?; - - documents.remove(id); - - // Логируем операцию - self.log_operation("delete", id); - - println!("Document deleted from collection '{}': {} (seq: {})", self.name, id, seq); - Ok(()) - } else { - Err(crate::common::FutriixError::DatabaseError( - format!("Document not found: {}", id) - )) - } - } - - /// Wait-Free запрос документов - pub fn query_documents(&self, _filter: Vec) -> Result>> { - let documents = self.documents.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - // Логируем операцию запроса - self.log_operation("query", "multiple"); - - // TODO: Реализовать wait-free фильтрацию на основе filter - let documents: Vec> = documents.values().cloned().collect(); - - Ok(documents) - } - - /// Получение имени коллекции (wait-free) - #[allow(dead_code)] - pub fn get_name(&self) -> &str { - &self.name - } - - /// Получение количества документов (wait-free) - #[allow(dead_code)] - pub fn count_documents(&self) -> Result { - let documents = self.documents.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - Ok(documents.len()) - } -} - -/// Wait-Free база данных -#[derive(Clone)] -pub struct Database { - collections: Arc>, - procedures: Arc>>, - transactions: Arc>>, -} - -impl Database { - /// Создание новой wait-free базы данных - pub fn new() -> Self { - Self { - collections: Arc::new(DashMap::new()), - procedures: Arc::new(DashMap::new()), - transactions: Arc::new(DashMap::new()), - } - } - - /// Wait-Free получение или создание коллекции - pub fn get_collection(&self, name: &str) -> Collection { - if let Some(collection) = self.collections.get(name) { - return collection.clone(); - } - - // Создаем новую коллекцию wait-free способом - let new_collection = Collection::new(name.to_string()); - self.collections.insert(name.to_string(), new_collection.clone()); - - new_collection - } - - /// Wait-Free выполнение команды - pub fn execute_command(&self, command: protocol::Command) -> Result { - match command { - protocol::Command::Create { collection, document } => { - let coll = self.get_collection(&collection); - match coll.create_document(document) { - Ok(id) => Ok(protocol::Response::Success(id.into_bytes())), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::Read { collection, id } => { - let coll = self.get_collection(&collection); - match coll.read_document(&id) { - Ok(Some(document)) => Ok(protocol::Response::Success(document)), - Ok(None) => Ok(protocol::Response::Error(format!("Document not found: {}", id))), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::Update { collection, id, document } => { - let coll = self.get_collection(&collection); - match coll.update_document(&id, document) { - Ok(_) => Ok(protocol::Response::Success(vec![])), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::Delete { collection, id } => { - let coll = self.get_collection(&collection); - match coll.delete_document(&id) { - Ok(_) => Ok(protocol::Response::Success(vec![])), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::Query { collection, filter } => { - let coll = self.get_collection(&collection); - match coll.query_documents(filter) { - Ok(documents) => { - let json_docs: Vec = documents.into_iter() - .filter_map(|doc| serde_json::from_slice(&doc).ok()) - .collect(); - - match serde_json::to_vec(&json_docs) { - Ok(data) => Ok(protocol::Response::Success(data)), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::CreateProcedure { name, code } => { - // ИСПРАВЛЕНИЕ: Сохраняем процедуру в procedures map вместо коллекции - self.procedures.insert(name, code); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::CallProcedure { name } => { - if let Some(code) = self.procedures.get(&name) { - // TODO: Выполнить Lua код процедуры - Ok(protocol::Response::Success(format!("Procedure {} executed", name).into_bytes())) - } else { - Ok(protocol::Response::Error(format!("Procedure not found: {}", name))) - } - } - protocol::Command::BeginTransaction { transaction_id } => { - if self.transactions.contains_key(&transaction_id) { - return Ok(protocol::Response::Error("Transaction already exists".to_string())); - } - - self.transactions.insert(transaction_id, Vec::new()); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::CommitTransaction { transaction_id } => { - if let Some((_, commands)) = self.transactions.remove(&transaction_id) { - // ИСПРАВЛЕНИЕ: Выполняем все команды транзакции в правильном порядке - let mut results = Vec::new(); - for cmd in commands { - match self.execute_command(cmd) { - Ok(response) => results.push(response), - Err(e) => { - // Если одна команда не выполнилась, откатываем всю транзакцию - return Ok(protocol::Response::Error(format!("Transaction failed: {}", e))); - } - } - } - Ok(protocol::Response::Success(vec![])) - } else { - Ok(protocol::Response::Error("Transaction not found".to_string())) - } - } - protocol::Command::RollbackTransaction { transaction_id } => { - if self.transactions.remove(&transaction_id).is_some() { - Ok(protocol::Response::Success(vec![])) - } else { - Ok(protocol::Response::Error("Transaction not found".to_string())) - } - } - protocol::Command::CreateIndex { collection, index } => { - let coll = self.get_collection(&collection); - match coll.create_index(index) { - Ok(_) => Ok(protocol::Response::Success(vec![])), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::QueryByIndex { collection, index_name, value } => { - let coll = self.get_collection(&collection); - let value: Value = serde_json::from_slice(&value) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; - - match coll.query_by_index(&index_name, &value) { - Ok(document_ids) => { - let result = serde_json::to_vec(&document_ids) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; - Ok(protocol::Response::Success(result)) - } - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - // Обработка остальных команд... - _ => { - // TODO: Реализовать обработку остальных команд - Ok(protocol::Response::Error("Command not implemented".to_string())) - } - } - } - - /// Wait-Free получение статистики базы данных - #[allow(dead_code)] - pub fn get_stats(&self) -> Result> { - let mut stats = std::collections::HashMap::new(); - stats.insert("collections".to_string(), self.collections.len()); - stats.insert("procedures".to_string(), self.procedures.len()); - stats.insert("active_transactions".to_string(), self.transactions.len()); - - // Подсчет документов во всех коллекциях - let total_documents: usize = self.collections.iter() - .map(|entry| entry.value().count_documents().unwrap_or(0)) - .sum(); - - stats.insert("total_documents".to_string(), total_documents); - - Ok(stats) - } - - /// Создание бэкапа базы данных - pub fn create_backup(&self) -> Result>>> { - let mut backup = std::collections::HashMap::new(); - - for entry in self.collections.iter() { - let name = entry.key().clone(); - let collection = entry.value(); - - let documents = collection.documents.read() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - let mut collection_backup = std::collections::HashMap::new(); - for (id, document) in documents.iter() { - collection_backup.insert(id.clone(), document.clone()); - } - - backup.insert(name, collection_backup); - } - - Ok(backup) - } - - /// Восстановление из бэкапа - pub fn restore_from_backup(&self, backup: std::collections::HashMap>>) -> Result<()> { - // Очищаем существующие коллекции - self.collections.clear(); - - // Восстанавливаем данные из бэкапа - for (collection_name, documents) in backup { - let collection = Collection::new(collection_name.clone()); - - { - let mut collection_docs = collection.documents.write() - .map_err(|e| crate::common::FutriixError::IoError( - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - ))?; - - for (id, document) in documents { - collection_docs.insert(id, document); - } - } - - self.collections.insert(collection_name, collection); - } - - Ok(()) - } - - /// Добавление триггера к коллекции - pub fn add_trigger(&self, trigger: Trigger) -> Result<()> { - let collection = self.get_collection(&trigger.collection); - collection.add_trigger(trigger) - } -}