diff --git a/src/server/database.rs b/src/server/database.rs deleted file mode 100644 index aded621..0000000 --- a/src/server/database.rs +++ /dev/null @@ -1,1079 +0,0 @@ -// src/server/database.rs -//! Lock-Free документо-ориентированная база данных Futriix -//! -//! Этот модуль реализует lock-free документо-ориентированную базу данных с использованием атомарных -//! ссылок и lock-free структур данных для максимальной производительности в многопоточных средах. -//! Все операции с документами автоматически дополняются временными метками для аудита. -//! -//! Основные компоненты: -//! - LockFreeHashMap: lock-free хэш-таблица на основе атомарных ссылок -//! - LockFreeDocumentStore: lock-free хранилище документов -//! - Collection: коллекция документов с поддержкой индексов и триггеров -//! - StmTransaction: транзакции с Software Transactional Memory -//! - Database: основная база данных с поддержкой CRUD операций - -#![allow(unused_imports)] -#![allow(dead_code)] - -use std::sync::Arc; -use std::sync::atomic::{AtomicU64, AtomicBool, AtomicUsize, Ordering}; -use std::collections::HashMap; -use serde::{Serialize, Deserialize}; -use serde_json::Value; -use uuid::Uuid; -use crossbeam::epoch::{self, Atomic, Owned, Guard}; -use crossbeam::queue::SegQueue; -use crossbeam::utils::CachePadded; - -use crate::common::Result; -use crate::common::protocol; - -/// События триггеров для коллекций документов -/// Определяет моменты выполнения триггеров в жизненном цикле документа -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub enum TriggerEvent { - /// Перед созданием документа - BeforeCreate, - /// После создания документа - AfterCreate, - /// Перед обновлением документа - BeforeUpdate, - /// После обновления документа - AfterUpdate, - /// Перед удалением документа - BeforeDelete, - /// После удаления документа - AfterDelete, -} - -/// Триггер для коллекции документов -/// Содержит код на Lua, выполняемый при определенном событии -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Trigger { - /// Уникальное имя триггера - pub name: String, - /// Событие, при котором срабатывает триггер - pub event: TriggerEvent, - /// Коллекция, к которой привязан триггер - pub collection: String, - /// Код на Lua для выполнения - pub lua_code: String, -} - -/// Типы индексов в коллекции -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum IndexType { - /// Первичный индекс (уникальный идентификатор) - Primary, - /// Вторичный индекс (по произвольным полям) - Secondary, -} - -/// Структура индекса для коллекции документов -/// Определяет поле для индексации и его свойства -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Index { - /// Уникальное имя индекса - pub name: String, - /// Тип индекса - pub index_type: IndexType, - /// Поле документа для индексации - pub field: String, - /// Флаг уникальности значений индекса - pub unique: bool, -} - -/// Lock-free хэш-таблица на основе атомарных ссылок -/// Использует epoch-based memory reclamation для безопасного управления памятью -#[derive(Clone)] -struct LockFreeHashMap { - /// Атомарная ссылка на внутренний HashMap - inner: Atomic>, -} - -impl LockFreeHashMap -where - K: Eq + std::hash::Hash + Clone, - V: Clone, -{ - /// Создает новую пустую lock-free хэш-таблицу - fn new() -> Self { - Self { - inner: Atomic::new(HashMap::new()), - } - } - - /// Вставляет пару ключ-значение в таблицу - /// Возвращает предыдущее значение, если ключ уже существовал - fn insert(&self, key: K, value: V, guard: &Guard) -> Option { - loop { - // Загружаем текущее состояние таблицы - let current = self.inner.load(Ordering::Acquire, guard); - let mut new_map = HashMap::new(); - - // Копируем существующие элементы - if let Some(ref map) = unsafe { current.as_ref() } { - new_map.extend(map.iter().map(|(k, v)| (k.clone(), v.clone()))); - } - - // Вставляем новый элемент - let old_value = new_map.insert(key.clone(), value.clone()); - - // Пытаемся атомарно заменить таблицу - let new_ptr = Owned::new(new_map); - if self.inner.compare_exchange(current, new_ptr, Ordering::Release, Ordering::Relaxed, guard).is_ok() { - return old_value; - } - // CAS failed, retry - } - } - - /// Получает значение по ключу - fn get(&self, key: &K, guard: &Guard) -> Option { - let current = self.inner.load(Ordering::Acquire, guard); - if let Some(map) = unsafe { current.as_ref() } { - map.get(key).cloned() - } else { - None - } - } - - /// Удаляет значение по ключу - /// Возвращает удаленное значение, если ключ существовал - fn remove(&self, key: &K, guard: &Guard) -> Option { - loop { - let current = self.inner.load(Ordering::Acquire, guard); - if let Some(map) = unsafe { current.as_ref() } { - let mut new_map = HashMap::new(); - new_map.extend(map.iter().filter(|(k, _)| *k != key).map(|(k, v)| (k.clone(), v.clone()))); - - let new_ptr = Owned::new(new_map); - if self.inner.compare_exchange(current, new_ptr, Ordering::Release, Ordering::Relaxed, guard).is_ok() { - return map.get(key).cloned(); - } - // CAS failed, retry - } else { - return None; - } - } - } - - /// Возвращает количество элементов в таблице - fn len(&self, guard: &Guard) -> usize { - let current = self.inner.load(Ordering::Acquire, guard); - if let Some(map) = unsafe { current.as_ref() } { - map.len() - } else { - 0 - } - } - - /// Возвращает итератор по элементам таблицы - fn iter<'a>(&'a self, guard: &'a Guard) -> Vec<(&'a K, &'a V)> { - let current = self.inner.load(Ordering::Acquire, guard); - if let Some(map) = unsafe { current.as_ref() } { - map.iter().collect() - } else { - Vec::new() - } - } -} - -/// Lock-Free хранилище документов -/// Управляет хранением документов с поддержкой атомарных операций -struct LockFreeDocumentStore { - /// Хэш-таблица документов (ID -> содержимое) - documents: LockFreeHashMap>, - /// Счетчик последовательности для отслеживания изменений - sequence: AtomicU64, -} - -impl LockFreeDocumentStore { - /// Создает новое пустое хранилище документов - fn new() -> Self { - Self { - documents: LockFreeHashMap::new(), - sequence: AtomicU64::new(0), - } - } - - /// Вставляет документ в хранилище - /// Возвращает ошибку, если документ с таким ID уже существует - fn insert(&self, id: String, document: Vec, guard: &Guard) -> Result<()> { - if self.documents.get(&id, guard).is_some() { - return Err(crate::common::FutriixError::DatabaseError( - format!("Document with ID {} already exists", id) - )); - } - - self.documents.insert(id, document, guard); - self.sequence.fetch_add(1, Ordering::SeqCst); - Ok(()) - } - - /// Получает документ по ID - fn get(&self, id: &str, guard: &Guard) -> Option> { - self.documents.get(&id.to_string(), guard) - } - - /// Обновляет существующий документ - /// Возвращает ошибку, если документ не найден - fn update(&self, id: &str, document: Vec, guard: &Guard) -> Result<()> { - if self.documents.get(&id.to_string(), guard).is_none() { - return Err(crate::common::FutriixError::DatabaseError( - format!("Document not found: {}", id) - )); - } - - self.documents.insert(id.to_string(), document, guard); - self.sequence.fetch_add(1, Ordering::SeqCst); - Ok(()) - } - - /// Удаляет документ по ID - /// Возвращает ошибку, если документ не найден - fn remove(&self, id: &str, guard: &Guard) -> Result<()> { - if self.documents.remove(&id.to_string(), guard).is_none() { - return Err(crate::common::FutriixError::DatabaseError( - format!("Document not found: {}", id) - )); - } - - self.sequence.fetch_add(1, Ordering::SeqCst); - Ok(()) - } - - /// Возвращает все документы в хранилище - fn get_all(&self, guard: &Guard) -> Vec> { - self.documents.iter(guard) - .into_iter() - .map(|(_, v)| v.clone()) - .collect() - } - - /// Возвращает количество документов в хранилище - fn len(&self, guard: &Guard) -> usize { - self.documents.len(guard) - } -} - -/// Lock-Free коллекция документов -/// Представляет собой именованную коллекцию с поддержкой индексов и триггеров -#[derive(Clone)] -pub struct Collection { - /// Имя коллекции - name: String, - /// Хранилище документов коллекции - document_store: Arc, - /// Очередь триггеров коллекции - triggers: Arc>, - /// Индексы коллекции - indexes: Arc>, - /// Данные индексов (имя индекса -> хэш-таблица значений) - index_data: Arc>>>, -} - -impl Collection { - /// Создает новую пустую коллекцию с указанным именем - pub fn new(name: String) -> Self { - Self { - name, - document_store: Arc::new(LockFreeDocumentStore::new()), - triggers: Arc::new(SegQueue::new()), - indexes: Arc::new(LockFreeHashMap::new()), - index_data: Arc::new(LockFreeHashMap::new()), - } - } - - /// Логирует операцию с документом в файл и консоль - fn log_operation(&self, operation: &str, id: &str) { - use std::fs::OpenOptions; - use std::io::Write; - - let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(); - let log_message = format!("[{}] Collection: '{}', Operation: '{}', Document ID: '{}'\n", - timestamp, self.name, operation, id); - - if let Ok(mut file) = OpenOptions::new() - .create(true) - .append(true) - .open("futriix.log") - { - let _ = file.write_all(log_message.as_bytes()); - } - - println!("{}", log_message.trim()); - } - - /// Добавляет временную метку и информацию об операции к документу - fn add_timestamp_to_document(&self, document: Vec, operation: &str) -> Result> { - let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(); - - let mut doc_value: Value = serde_json::from_slice(&document) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; - - if let Value::Object(ref mut obj) = doc_value { - obj.insert("_timestamp".to_string(), Value::String(timestamp.clone())); - obj.insert("_operation".to_string(), Value::String(operation.to_string())); - } - - serde_json::to_vec(&doc_value) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string())) - } - - /// Добавляет триггер в коллекцию - pub fn add_trigger(&self, trigger: Trigger) -> Result<()> { - self.triggers.push(trigger); - Ok(()) - } - - /// Создает индекс для коллекции - pub fn create_index(&self, index: Index) -> Result<()> { - let guard = epoch::pin(); - - // Проверяем, не существует ли уже индекс с таким именем - if self.indexes.get(&index.name, &guard).is_some() { - return Err(crate::common::FutriixError::DatabaseError( - format!("Index already exists: {}", index.name) - )); - } - - // Создаем пустую хэш-таблицу для данных индекса - self.index_data.insert(index.name.clone(), LockFreeHashMap::new(), &guard); - // Сохраняем метаданные индекса - self.indexes.insert(index.name.clone(), index.clone(), &guard); - - // Перестраиваем индекс на основе существующих документов - self.rebuild_index(&index.name, &guard)?; - - Ok(()) - } - - /// Перестраивает индекс на основе всех документов в коллекции - fn rebuild_index(&self, index_name: &str, guard: &Guard) -> Result<()> { - // Получаем метаданные индекса - let index = self.indexes.get(&index_name.to_string(), guard) - .ok_or_else(|| crate::common::FutriixError::DatabaseError( - format!("Index not found: {}", index_name) - ))?; - - let all_documents = self.document_store.get_all(guard); - let index_map = LockFreeHashMap::new(); - - // Индексируем каждый документ - for document_bytes in all_documents { - if let Ok(document) = serde_json::from_slice::(&document_bytes) { - if let Some(field_value) = document.get(&index.field) { - let id = document.get("_id") - .or_else(|| document.get("id")) - .and_then(|v| v.as_str()) - .unwrap_or_default() - .to_string(); - - if !id.is_empty() { - let value_str = field_value.to_string(); - - let guard2 = epoch::pin(); - let current_values = index_map.get(&value_str, &guard2) - .unwrap_or_else(Vec::new); - - let mut new_values = current_values.clone(); - new_values.push(id.clone()); - - // Проверяем уникальность для уникальных индексов - if index.unique && new_values.len() > 1 { - return Err(crate::common::FutriixError::DatabaseError( - format!("Duplicate value {} for unique index {}", field_value, index_name) - )); - } - - index_map.insert(value_str, new_values, guard); - } - } - } - } - - // Сохраняем обновленные данные индекса - self.index_data.insert(index_name.to_string(), index_map, guard); - Ok(()) - } - - /// Обновляет все индексы при изменении документа - fn update_indexes(&self, old_document: Option<&[u8]>, new_document: &[u8], document_id: &str, guard: &Guard) -> Result<()> { - let new_doc_value: Value = serde_json::from_slice(new_document) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; - - let old_doc_value: Option = old_document - .and_then(|doc| serde_json::from_slice(doc).ok()); - - // Получаем список всех индексов коллекции - let indexes: Vec<_> = { - let guard2 = epoch::pin(); - self.indexes.iter(&guard2) - .into_iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect() - }; - - // Обновляем каждый индекс - for (index_name, index) in indexes { - let guard2 = epoch::pin(); - if let Some(index_map) = self.index_data.get(&index_name, &guard2) { - // Удаляем старые записи из индекса - if let Some(old_doc) = &old_doc_value { - if let Some(old_value) = old_doc.get(&index.field) { - let old_value_str = old_value.to_string(); - if let Some(entries) = index_map.get(&old_value_str, guard) { - let mut new_entries = entries.clone(); - new_entries.retain(|id| id != document_id); - - if new_entries.is_empty() { - index_map.remove(&old_value_str, guard); - } else { - index_map.insert(old_value_str, new_entries, guard); - } - } - } - } - - // Добавляем новые записи в индекс - if let Some(new_value) = new_doc_value.get(&index.field) { - let new_value_str = new_value.to_string(); - let current_entries = index_map.get(&new_value_str, guard) - .unwrap_or_else(Vec::new); - - // Проверяем уникальность для уникальных индексов - if index.unique && !current_entries.is_empty() && current_entries[0] != document_id { - return Err(crate::common::FutriixError::DatabaseError( - format!("Duplicate value {} for unique index {}", new_value, index_name) - )); - } - - let mut new_entries = current_entries.clone(); - if !new_entries.contains(&document_id.to_string()) { - new_entries.push(document_id.to_string()); - } - - index_map.insert(new_value_str, new_entries, guard); - } - } - } - - Ok(()) - } - - /// Выполняет запрос по индексу - pub fn query_by_index(&self, index_name: &str, value: &Value) -> Result> { - let guard = epoch::pin(); - let index_map = self.index_data.get(&index_name.to_string(), &guard) - .ok_or_else(|| crate::common::FutriixError::DatabaseError( - format!("Index not found: {}", index_name) - ))?; - - let value_str = value.to_string(); - Ok(index_map.get(&value_str, &guard) - .unwrap_or_default()) - } - - /// Создает новый документ в коллекции - pub fn create_document(&self, document: Vec) -> Result { - let guard = epoch::pin(); - - // Генерируем уникальный ID документа - let id = Uuid::new_v4().to_string(); - - // Добавляем поле _id к документу - let mut doc_value: Value = serde_json::from_slice(&document) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; - - if let Value::Object(ref mut obj) = doc_value { - obj.insert("_id".to_string(), Value::String(id.clone())); - } - - let document_with_id = serde_json::to_vec(&doc_value) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; - - // Добавляем временную метку - let document_with_timestamp = self.add_timestamp_to_document(document_with_id, "create")?; - - // Обновляем индексы - self.update_indexes(None, &document_with_timestamp, &id, &guard)?; - - // Сохраняем документ - self.document_store.insert(id.clone(), document_with_timestamp, &guard)?; - - // Логируем операцию - self.log_operation("create", &id); - - println!("Document created in collection '{}' with ID: {}", self.name, id); - Ok(id) - } - - /// Читает документ по ID - pub fn read_document(&self, id: &str) -> Result>> { - let guard = epoch::pin(); - - self.log_operation("read", id); - - Ok(self.document_store.get(id, &guard)) - } - - /// Обновляет существующий документ - pub fn update_document(&self, id: &str, document: Vec) -> Result<()> { - let guard = epoch::pin(); - - // Получаем старую версию документа - let old_document = self.document_store.get(id, &guard) - .ok_or_else(|| crate::common::FutriixError::DatabaseError( - format!("Document not found: {}", id) - ))?; - - // Добавляем временную метку - let document_with_timestamp = self.add_timestamp_to_document(document, "update")?; - - // Обновляем индексы - self.update_indexes(Some(&old_document), &document_with_timestamp, id, &guard)?; - - // Сохраняем обновленный документ - self.document_store.update(id, document_with_timestamp, &guard)?; - - // Логируем операцию - self.log_operation("update", id); - - println!("Document updated in collection '{}': {}", self.name, id); - Ok(()) - } - - /// Удаляет документ по ID - pub fn delete_document(&self, id: &str) -> Result<()> { - let guard = epoch::pin(); - - // Получаем старую версию документа для обновления индексов - let old_document = self.document_store.get(id, &guard) - .ok_or_else(|| crate::common::FutriixError::DatabaseError( - format!("Document not found: {}", id) - ))?; - - // Обновляем индексы (удаляем записи) - self.update_indexes(Some(&old_document), &[], id, &guard)?; - - // Удаляем документ - self.document_store.remove(id, &guard)?; - - // Логируем операцию - self.log_operation("delete", id); - - println!("Document deleted from collection '{}': {}", self.name, id); - Ok(()) - } - - /// Выполняет запрос документов с фильтром - pub fn query_documents(&self, filter: Vec) -> Result>> { - let guard = epoch::pin(); - - self.log_operation("query", "multiple"); - - let documents = self.document_store.get_all(&guard); - - // Применяем фильтр, если он указан - if !filter.is_empty() { - if let Ok(filter_value) = serde_json::from_slice::(&filter) { - return self.filter_documents(documents, &filter_value); - } - } - - Ok(documents) - } - - /// Фильтрует документы по заданному критерию - fn filter_documents(&self, documents: Vec>, filter: &Value) -> Result>> { - let mut filtered = Vec::new(); - - for document in documents { - if let Ok(doc_value) = serde_json::from_slice::(&document) { - if Self::document_matches_filter(&doc_value, filter) { - filtered.push(document); - } - } - } - - Ok(filtered) - } - - /// Проверяет, соответствует ли документ фильтру - fn document_matches_filter(document: &Value, filter: &Value) -> bool { - match filter { - Value::Object(filter_obj) => { - if let Value::Object(doc_obj) = document { - // Исправлено: добавлен ключевое слово `in` в цикле for - for (key, filter_value) in filter_obj { - match doc_obj.get(key) { - Some(doc_value) => { - if doc_value != filter_value { - return false; - } - } - None => return false, - } - } - true - } else { - false - } - } - _ => true, // Пустой фильтр всегда соответствует - } - } - - /// Возвращает имя коллекции - #[allow(dead_code)] - pub fn get_name(&self) -> &str { - &self.name - } - - /// Возвращает количество документов в коллекции - #[allow(dead_code)] - pub fn count_documents(&self) -> Result { - let guard = epoch::pin(); - Ok(self.document_store.len(&guard)) - } -} - -/// STM транзакция с Software Transactional Memory -/// Предоставляет атомарное выполнение нескольких команд -pub struct StmTransaction { - /// Команды в транзакции - commands: Vec, - /// Флаг активности транзакции - active: AtomicBool, -} - -// Ручная реализация Clone для StmTransaction -// Необходима из-за наличия AtomicBool, который не реализует Clone автоматически -impl Clone for StmTransaction { - fn clone(&self) -> Self { - Self { - commands: self.commands.clone(), - active: AtomicBool::new(self.active.load(Ordering::SeqCst)), - } - } -} - -impl StmTransaction { - /// Создает новую пустую транзакцию - fn new() -> Self { - Self { - commands: Vec::new(), - active: AtomicBool::new(true), - } - } - - /// Добавляет команду в транзакцию - fn add_command(&mut self, command: protocol::Command) -> Result<()> { - if !self.active.load(Ordering::Acquire) { - return Err(crate::common::FutriixError::DatabaseError( - "Transaction is not active".to_string() - )); - } - self.commands.push(command); - Ok(()) - } - - /// Выполняет коммит транзакции - fn commit(self, db: &Database) -> Result> { - if !self.active.load(Ordering::Acquire) { - return Err(crate::common::FutriixError::DatabaseError( - "Transaction is not active".to_string() - )); - } - - self.active.store(false, Ordering::Release); - - // Параллельное выполнение команд транзакции - let mut handles = Vec::new(); - - for cmd in self.commands { - let db_clone = db.clone(); - let handle = std::thread::spawn(move || { - db_clone.execute_command(cmd) - }); - handles.push(handle); - } - - // Собираем результаты выполнения команд - let mut results = Vec::new(); - for handle in handles { - match handle.join() { - Ok(Ok(response)) => results.push(response), - Ok(Err(e)) => { - return Err(crate::common::FutriixError::DatabaseError( - format!("Transaction failed: {}", e) - )); - } - Err(_) => { - return Err(crate::common::FutriixError::DatabaseError( - "Thread panicked during transaction commit".to_string() - )); - } - } - } - - Ok(results) - } - - /// Откатывает транзакцию - fn rollback(self) -> Result<()> { - if !self.active.load(Ordering::Acquire) { - return Err(crate::common::FutriixError::DatabaseError( - "Transaction is not active".to_string() - )); - } - - self.active.store(false, Ordering::Release); - Ok(()) - } -} - -/// Lock-Free база данных Futriix -/// Основной компонент системы, управляющий коллекциями и транзакциями -#[derive(Clone)] -pub struct Database { - /// Коллекции базы данных - collections: Arc>, - /// Хранимые процедуры - procedures: Arc>>, - /// Активные транзакции - transactions: Arc>, - /// Триггеры по коллекциям - triggers: Arc>>, -} - -impl Database { - /// Создает новую пустую базу данных - pub fn new() -> Self { - Self { - collections: Arc::new(LockFreeHashMap::new()), - procedures: Arc::new(LockFreeHashMap::new()), - transactions: Arc::new(LockFreeHashMap::new()), - triggers: Arc::new(LockFreeHashMap::new()), - } - } - - /// Получает коллекцию по имени (создает новую, если не существует) - pub fn get_collection(&self, name: &str) -> Collection { - let guard = epoch::pin(); - - if let Some(collection) = self.collections.get(&name.to_string(), &guard) { - return collection; - } - - let new_collection = Collection::new(name.to_string()); - self.collections.insert(name.to_string(), new_collection.clone(), &guard); - - new_collection - } - - /// Получает коллекцию по имени (без создания новой) - fn get_collection_opt(&self, name: &str) -> Option { - let guard = epoch::pin(); - self.collections.get(&name.to_string(), &guard) - } - - /// Выполняет команду в базе данных - pub fn execute_command(&self, command: protocol::Command) -> Result { - match command { - protocol::Command::Create { collection, document } => { - self.execute_triggers(&collection, TriggerEvent::BeforeCreate, &document)?; - - let coll = self.get_collection(&collection); - match coll.create_document(document.clone()) { - Ok(id) => { - self.execute_triggers(&collection, TriggerEvent::AfterCreate, &document)?; - Ok(protocol::Response::Success(id.into_bytes())) - } - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::Read { collection, id } => { - let coll = self.get_collection(&collection); - match coll.read_document(&id) { - Ok(Some(document)) => Ok(protocol::Response::Success(document)), - Ok(None) => Ok(protocol::Response::Error(format!("Document not found: {}", id))), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::Update { collection, id, document } => { - let old_document = { - let coll = self.get_collection(&collection); - coll.read_document(&id).ok().flatten() - }; - - self.execute_triggers(&collection, TriggerEvent::BeforeUpdate, &document)?; - - let coll = self.get_collection(&collection); - match coll.update_document(&id, document.clone()) { - Ok(_) => { - self.execute_triggers(&collection, TriggerEvent::AfterUpdate, &document)?; - Ok(protocol::Response::Success(vec![])) - } - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::Delete { collection, id } => { - let old_document = { - let coll = self.get_collection(&collection); - coll.read_document(&id).ok().flatten() - }; - - if let Some(doc) = &old_document { - self.execute_triggers(&collection, TriggerEvent::BeforeDelete, doc)?; - } - - let coll = self.get_collection(&collection); - match coll.delete_document(&id) { - Ok(_) => { - if let Some(doc) = &old_document { - self.execute_triggers(&collection, TriggerEvent::AfterDelete, doc)?; - } - Ok(protocol::Response::Success(vec![])) - } - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::Query { collection, filter } => { - let coll = self.get_collection(&collection); - match coll.query_documents(filter) { - Ok(documents) => { - let json_docs: Vec = documents.into_iter() - .filter_map(|doc| serde_json::from_slice(&doc).ok()) - .collect(); - - match serde_json::to_vec(&json_docs) { - Ok(data) => Ok(protocol::Response::Success(data)), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::CreateProcedure { name, code } => { - let guard = epoch::pin(); - self.procedures.insert(name, code, &guard); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::CallProcedure { name } => { - let guard = epoch::pin(); - if let Some(code) = self.procedures.get(&name, &guard) { - Ok(protocol::Response::Success(format!("Procedure {} executed", name).into_bytes())) - } else { - Ok(protocol::Response::Error(format!("Procedure not found: {}", name))) - } - } - protocol::Command::BeginTransaction { transaction_id } => { - let guard = epoch::pin(); - if self.transactions.get(&transaction_id, &guard).is_some() { - return Ok(protocol::Response::Error("Transaction already exists".to_string())); - } - - let transaction = StmTransaction::new(); - self.transactions.insert(transaction_id, transaction, &guard); - Ok(protocol::Response::Success(vec![])) - } - protocol::Command::CommitTransaction { transaction_id } => { - let guard = epoch::pin(); - if let Some(transaction) = self.transactions.remove(&transaction_id, &guard) { - match transaction.commit(self) { - Ok(_) => Ok(protocol::Response::Success(vec![])), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } else { - Ok(protocol::Response::Error("Transaction not found".to_string())) - } - } - protocol::Command::RollbackTransaction { transaction_id } => { - let guard = epoch::pin(); - if let Some(transaction) = self.transactions.remove(&transaction_id, &guard) { - match transaction.rollback() { - Ok(_) => Ok(protocol::Response::Success(vec![])), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } else { - Ok(protocol::Response::Error("Transaction not found".to_string())) - } - } - protocol::Command::CreateIndex { collection, index } => { - let coll = self.get_collection(&collection); - match coll.create_index(index) { - Ok(_) => Ok(protocol::Response::Success(vec![])), - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - protocol::Command::QueryByIndex { collection, index_name, value } => { - let coll = self.get_collection(&collection); - let value: Value = serde_json::from_slice(&value) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; - - match coll.query_by_index(&index_name, &value) { - Ok(document_ids) => { - let result = serde_json::to_vec(&document_ids) - .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; - Ok(protocol::Response::Success(result)) - } - Err(e) => Ok(protocol::Response::Error(e.to_string())), - } - } - _ => { - Ok(protocol::Response::Error("Command not implemented".to_string())) - } - } - } - - /// Выполняет триггеры для указанной коллекции и события - fn execute_triggers(&self, collection: &str, event: TriggerEvent, document: &[u8]) -> Result<()> { - let guard = epoch::pin(); - if let Some(triggers) = self.triggers.get(&collection.to_string(), &guard) { - for trigger in triggers { - if trigger.event == event { - println!("Executing trigger '{}' for collection '{}' on {:?}", - trigger.name, collection, event); - } - } - } - Ok(()) - } - - /// Добавляет триггер в базу данных - pub fn add_trigger(&self, trigger: Trigger) -> Result<()> { - let guard = epoch::pin(); - - let current_triggers = self.triggers.get(&trigger.collection, &guard) - .unwrap_or_else(Vec::new); - - // Проверяем уникальность имени триггера - for existing in ¤t_triggers { - if existing.name == trigger.name { - return Err(crate::common::FutriixError::DatabaseError( - format!("Trigger '{}' already exists for collection '{}'", - trigger.name, trigger.collection) - )); - } - } - - let mut new_triggers = current_triggers.clone(); - new_triggers.push(trigger.clone()); - - self.triggers.insert(trigger.collection.clone(), new_triggers, &guard); - Ok(()) - } - - /// Возвращает статистику базы данных - #[allow(dead_code)] - pub fn get_stats(&self) -> Result> { - let guard = epoch::pin(); - let mut stats = std::collections::HashMap::new(); - - let collections_count = { - let guard2 = epoch::pin(); - self.collections.len(&guard2) - }; - - let procedures_count = { - let guard2 = epoch::pin(); - self.procedures.len(&guard2) - }; - - let transactions_count = { - let guard2 = epoch::pin(); - self.transactions.len(&guard2) - }; - - let triggers_count = { - let guard2 = epoch::pin(); - self.triggers.len(&guard2) - }; - - stats.insert("collections".to_string(), collections_count); - stats.insert("procedures".to_string(), procedures_count); - stats.insert("active_transactions".to_string(), transactions_count); - stats.insert("triggers".to_string(), triggers_count); - - let mut total_documents = 0; - let collections: Vec<_> = { - let guard2 = epoch::pin(); - self.collections.iter(&guard2) - .into_iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect() - }; - - for (_, collection) in collections { - total_documents += collection.count_documents().unwrap_or(0); - } - - stats.insert("total_documents".to_string(), total_documents); - - Ok(stats) - } - - /// Создает бэкап базы данных - pub fn create_backup(&self) -> Result>>> { - let guard = epoch::pin(); - let mut backup = std::collections::HashMap::new(); - - let collections: Vec<_> = self.collections.iter(&guard) - .into_iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); - - for (name, collection) in collections { - let documents = collection.query_documents(vec![])?; - - let mut collection_backup = std::collections::HashMap::new(); - for document in documents { - if let Ok(doc_value) = serde_json::from_slice::(&document) { - if let Some(id) = doc_value.get("_id").and_then(|v| v.as_str()) { - collection_backup.insert(id.to_string(), document); - } - } - } - - backup.insert(name, collection_backup); - } - - Ok(backup) - } - - /// Восстанавливает базу данных из бэкапа - pub fn restore_from_backup(&self, backup: std::collections::HashMap>>) -> Result<()> { - let guard = epoch::pin(); - - // Очищаем коллекции через lock-free операцию - loop { - let current = self.collections.inner.load(Ordering::Acquire, &guard); - let new_map = HashMap::new(); - let new_ptr = Owned::new(new_map); - - if self.collections.inner.compare_exchange(current, new_ptr, Ordering::Release, Ordering::Relaxed, &guard).is_ok() { - break; - } - } - - // Восстанавливаем документы из бэкапа - for (collection_name, documents) in backup { - let collection = self.get_collection(&collection_name); - - for (id, document) in documents { - let command = protocol::Command::Create { - collection: collection_name.clone(), - document, - }; - - if let Err(e) = self.execute_command(command) { - eprintln!("Failed to restore document {}: {}", id, e); - } - } - } - - Ok(()) - } -}