From 1f89b71f21587f62688a28ba4d1f84a4006d6f12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Fri, 5 Dec 2025 11:10:56 +0000 Subject: [PATCH] Upload files to "src/server" --- src/server/database.rs | 1079 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1079 insertions(+) create mode 100644 src/server/database.rs diff --git a/src/server/database.rs b/src/server/database.rs new file mode 100644 index 0000000..aded621 --- /dev/null +++ b/src/server/database.rs @@ -0,0 +1,1079 @@ +// src/server/database.rs +//! Lock-Free документо-ориентированная база данных Futriix +//! +//! Этот модуль реализует lock-free документо-ориентированную базу данных с использованием атомарных +//! ссылок и lock-free структур данных для максимальной производительности в многопоточных средах. +//! Все операции с документами автоматически дополняются временными метками для аудита. +//! +//! Основные компоненты: +//! - LockFreeHashMap: lock-free хэш-таблица на основе атомарных ссылок +//! - LockFreeDocumentStore: lock-free хранилище документов +//! - Collection: коллекция документов с поддержкой индексов и триггеров +//! - StmTransaction: транзакции с Software Transactional Memory +//! - Database: основная база данных с поддержкой CRUD операций + +#![allow(unused_imports)] +#![allow(dead_code)] + +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, AtomicBool, AtomicUsize, Ordering}; +use std::collections::HashMap; +use serde::{Serialize, Deserialize}; +use serde_json::Value; +use uuid::Uuid; +use crossbeam::epoch::{self, Atomic, Owned, Guard}; +use crossbeam::queue::SegQueue; +use crossbeam::utils::CachePadded; + +use crate::common::Result; +use crate::common::protocol; + +/// События триггеров для коллекций документов +/// Определяет моменты выполнения триггеров в жизненном цикле документа +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum TriggerEvent { + /// Перед созданием документа + BeforeCreate, + /// После создания документа + AfterCreate, + /// Перед обновлением документа + BeforeUpdate, + /// После обновления документа + AfterUpdate, + /// Перед удалением документа + BeforeDelete, + /// После удаления документа + AfterDelete, +} + +/// Триггер для коллекции документов +/// Содержит код на Lua, выполняемый при определенном событии +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Trigger { + /// Уникальное имя триггера + pub name: String, + /// Событие, при котором срабатывает триггер + pub event: TriggerEvent, + /// Коллекция, к которой привязан триггер + pub collection: String, + /// Код на Lua для выполнения + pub lua_code: String, +} + +/// Типы индексов в коллекции +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum IndexType { + /// Первичный индекс (уникальный идентификатор) + Primary, + /// Вторичный индекс (по произвольным полям) + Secondary, +} + +/// Структура индекса для коллекции документов +/// Определяет поле для индексации и его свойства +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Index { + /// Уникальное имя индекса + pub name: String, + /// Тип индекса + pub index_type: IndexType, + /// Поле документа для индексации + pub field: String, + /// Флаг уникальности значений индекса + pub unique: bool, +} + +/// Lock-free хэш-таблица на основе атомарных ссылок +/// Использует epoch-based memory reclamation для безопасного управления памятью +#[derive(Clone)] +struct LockFreeHashMap { + /// Атомарная ссылка на внутренний HashMap + inner: Atomic>, +} + +impl LockFreeHashMap +where + K: Eq + std::hash::Hash + Clone, + V: Clone, +{ + /// Создает новую пустую lock-free хэш-таблицу + fn new() -> Self { + Self { + inner: Atomic::new(HashMap::new()), + } + } + + /// Вставляет пару ключ-значение в таблицу + /// Возвращает предыдущее значение, если ключ уже существовал + fn insert(&self, key: K, value: V, guard: &Guard) -> Option { + loop { + // Загружаем текущее состояние таблицы + let current = self.inner.load(Ordering::Acquire, guard); + let mut new_map = HashMap::new(); + + // Копируем существующие элементы + if let Some(ref map) = unsafe { current.as_ref() } { + new_map.extend(map.iter().map(|(k, v)| (k.clone(), v.clone()))); + } + + // Вставляем новый элемент + let old_value = new_map.insert(key.clone(), value.clone()); + + // Пытаемся атомарно заменить таблицу + let new_ptr = Owned::new(new_map); + if self.inner.compare_exchange(current, new_ptr, Ordering::Release, Ordering::Relaxed, guard).is_ok() { + return old_value; + } + // CAS failed, retry + } + } + + /// Получает значение по ключу + fn get(&self, key: &K, guard: &Guard) -> Option { + let current = self.inner.load(Ordering::Acquire, guard); + if let Some(map) = unsafe { current.as_ref() } { + map.get(key).cloned() + } else { + None + } + } + + /// Удаляет значение по ключу + /// Возвращает удаленное значение, если ключ существовал + fn remove(&self, key: &K, guard: &Guard) -> Option { + loop { + let current = self.inner.load(Ordering::Acquire, guard); + if let Some(map) = unsafe { current.as_ref() } { + let mut new_map = HashMap::new(); + new_map.extend(map.iter().filter(|(k, _)| *k != key).map(|(k, v)| (k.clone(), v.clone()))); + + let new_ptr = Owned::new(new_map); + if self.inner.compare_exchange(current, new_ptr, Ordering::Release, Ordering::Relaxed, guard).is_ok() { + return map.get(key).cloned(); + } + // CAS failed, retry + } else { + return None; + } + } + } + + /// Возвращает количество элементов в таблице + fn len(&self, guard: &Guard) -> usize { + let current = self.inner.load(Ordering::Acquire, guard); + if let Some(map) = unsafe { current.as_ref() } { + map.len() + } else { + 0 + } + } + + /// Возвращает итератор по элементам таблицы + fn iter<'a>(&'a self, guard: &'a Guard) -> Vec<(&'a K, &'a V)> { + let current = self.inner.load(Ordering::Acquire, guard); + if let Some(map) = unsafe { current.as_ref() } { + map.iter().collect() + } else { + Vec::new() + } + } +} + +/// Lock-Free хранилище документов +/// Управляет хранением документов с поддержкой атомарных операций +struct LockFreeDocumentStore { + /// Хэш-таблица документов (ID -> содержимое) + documents: LockFreeHashMap>, + /// Счетчик последовательности для отслеживания изменений + sequence: AtomicU64, +} + +impl LockFreeDocumentStore { + /// Создает новое пустое хранилище документов + fn new() -> Self { + Self { + documents: LockFreeHashMap::new(), + sequence: AtomicU64::new(0), + } + } + + /// Вставляет документ в хранилище + /// Возвращает ошибку, если документ с таким ID уже существует + fn insert(&self, id: String, document: Vec, guard: &Guard) -> Result<()> { + if self.documents.get(&id, guard).is_some() { + return Err(crate::common::FutriixError::DatabaseError( + format!("Document with ID {} already exists", id) + )); + } + + self.documents.insert(id, document, guard); + self.sequence.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + /// Получает документ по ID + fn get(&self, id: &str, guard: &Guard) -> Option> { + self.documents.get(&id.to_string(), guard) + } + + /// Обновляет существующий документ + /// Возвращает ошибку, если документ не найден + fn update(&self, id: &str, document: Vec, guard: &Guard) -> Result<()> { + if self.documents.get(&id.to_string(), guard).is_none() { + return Err(crate::common::FutriixError::DatabaseError( + format!("Document not found: {}", id) + )); + } + + self.documents.insert(id.to_string(), document, guard); + self.sequence.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + /// Удаляет документ по ID + /// Возвращает ошибку, если документ не найден + fn remove(&self, id: &str, guard: &Guard) -> Result<()> { + if self.documents.remove(&id.to_string(), guard).is_none() { + return Err(crate::common::FutriixError::DatabaseError( + format!("Document not found: {}", id) + )); + } + + self.sequence.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + /// Возвращает все документы в хранилище + fn get_all(&self, guard: &Guard) -> Vec> { + self.documents.iter(guard) + .into_iter() + .map(|(_, v)| v.clone()) + .collect() + } + + /// Возвращает количество документов в хранилище + fn len(&self, guard: &Guard) -> usize { + self.documents.len(guard) + } +} + +/// Lock-Free коллекция документов +/// Представляет собой именованную коллекцию с поддержкой индексов и триггеров +#[derive(Clone)] +pub struct Collection { + /// Имя коллекции + name: String, + /// Хранилище документов коллекции + document_store: Arc, + /// Очередь триггеров коллекции + triggers: Arc>, + /// Индексы коллекции + indexes: Arc>, + /// Данные индексов (имя индекса -> хэш-таблица значений) + index_data: Arc>>>, +} + +impl Collection { + /// Создает новую пустую коллекцию с указанным именем + pub fn new(name: String) -> Self { + Self { + name, + document_store: Arc::new(LockFreeDocumentStore::new()), + triggers: Arc::new(SegQueue::new()), + indexes: Arc::new(LockFreeHashMap::new()), + index_data: Arc::new(LockFreeHashMap::new()), + } + } + + /// Логирует операцию с документом в файл и консоль + fn log_operation(&self, operation: &str, id: &str) { + use std::fs::OpenOptions; + use std::io::Write; + + let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(); + let log_message = format!("[{}] Collection: '{}', Operation: '{}', Document ID: '{}'\n", + timestamp, self.name, operation, id); + + if let Ok(mut file) = OpenOptions::new() + .create(true) + .append(true) + .open("futriix.log") + { + let _ = file.write_all(log_message.as_bytes()); + } + + println!("{}", log_message.trim()); + } + + /// Добавляет временную метку и информацию об операции к документу + fn add_timestamp_to_document(&self, document: Vec, operation: &str) -> Result> { + let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(); + + let mut doc_value: Value = serde_json::from_slice(&document) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + + if let Value::Object(ref mut obj) = doc_value { + obj.insert("_timestamp".to_string(), Value::String(timestamp.clone())); + obj.insert("_operation".to_string(), Value::String(operation.to_string())); + } + + serde_json::to_vec(&doc_value) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string())) + } + + /// Добавляет триггер в коллекцию + pub fn add_trigger(&self, trigger: Trigger) -> Result<()> { + self.triggers.push(trigger); + Ok(()) + } + + /// Создает индекс для коллекции + pub fn create_index(&self, index: Index) -> Result<()> { + let guard = epoch::pin(); + + // Проверяем, не существует ли уже индекс с таким именем + if self.indexes.get(&index.name, &guard).is_some() { + return Err(crate::common::FutriixError::DatabaseError( + format!("Index already exists: {}", index.name) + )); + } + + // Создаем пустую хэш-таблицу для данных индекса + self.index_data.insert(index.name.clone(), LockFreeHashMap::new(), &guard); + // Сохраняем метаданные индекса + self.indexes.insert(index.name.clone(), index.clone(), &guard); + + // Перестраиваем индекс на основе существующих документов + self.rebuild_index(&index.name, &guard)?; + + Ok(()) + } + + /// Перестраивает индекс на основе всех документов в коллекции + fn rebuild_index(&self, index_name: &str, guard: &Guard) -> Result<()> { + // Получаем метаданные индекса + let index = self.indexes.get(&index_name.to_string(), guard) + .ok_or_else(|| crate::common::FutriixError::DatabaseError( + format!("Index not found: {}", index_name) + ))?; + + let all_documents = self.document_store.get_all(guard); + let index_map = LockFreeHashMap::new(); + + // Индексируем каждый документ + for document_bytes in all_documents { + if let Ok(document) = serde_json::from_slice::(&document_bytes) { + if let Some(field_value) = document.get(&index.field) { + let id = document.get("_id") + .or_else(|| document.get("id")) + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(); + + if !id.is_empty() { + let value_str = field_value.to_string(); + + let guard2 = epoch::pin(); + let current_values = index_map.get(&value_str, &guard2) + .unwrap_or_else(Vec::new); + + let mut new_values = current_values.clone(); + new_values.push(id.clone()); + + // Проверяем уникальность для уникальных индексов + if index.unique && new_values.len() > 1 { + return Err(crate::common::FutriixError::DatabaseError( + format!("Duplicate value {} for unique index {}", field_value, index_name) + )); + } + + index_map.insert(value_str, new_values, guard); + } + } + } + } + + // Сохраняем обновленные данные индекса + self.index_data.insert(index_name.to_string(), index_map, guard); + Ok(()) + } + + /// Обновляет все индексы при изменении документа + fn update_indexes(&self, old_document: Option<&[u8]>, new_document: &[u8], document_id: &str, guard: &Guard) -> Result<()> { + let new_doc_value: Value = serde_json::from_slice(new_document) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + + let old_doc_value: Option = old_document + .and_then(|doc| serde_json::from_slice(doc).ok()); + + // Получаем список всех индексов коллекции + let indexes: Vec<_> = { + let guard2 = epoch::pin(); + self.indexes.iter(&guard2) + .into_iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect() + }; + + // Обновляем каждый индекс + for (index_name, index) in indexes { + let guard2 = epoch::pin(); + if let Some(index_map) = self.index_data.get(&index_name, &guard2) { + // Удаляем старые записи из индекса + if let Some(old_doc) = &old_doc_value { + if let Some(old_value) = old_doc.get(&index.field) { + let old_value_str = old_value.to_string(); + if let Some(entries) = index_map.get(&old_value_str, guard) { + let mut new_entries = entries.clone(); + new_entries.retain(|id| id != document_id); + + if new_entries.is_empty() { + index_map.remove(&old_value_str, guard); + } else { + index_map.insert(old_value_str, new_entries, guard); + } + } + } + } + + // Добавляем новые записи в индекс + if let Some(new_value) = new_doc_value.get(&index.field) { + let new_value_str = new_value.to_string(); + let current_entries = index_map.get(&new_value_str, guard) + .unwrap_or_else(Vec::new); + + // Проверяем уникальность для уникальных индексов + if index.unique && !current_entries.is_empty() && current_entries[0] != document_id { + return Err(crate::common::FutriixError::DatabaseError( + format!("Duplicate value {} for unique index {}", new_value, index_name) + )); + } + + let mut new_entries = current_entries.clone(); + if !new_entries.contains(&document_id.to_string()) { + new_entries.push(document_id.to_string()); + } + + index_map.insert(new_value_str, new_entries, guard); + } + } + } + + Ok(()) + } + + /// Выполняет запрос по индексу + pub fn query_by_index(&self, index_name: &str, value: &Value) -> Result> { + let guard = epoch::pin(); + let index_map = self.index_data.get(&index_name.to_string(), &guard) + .ok_or_else(|| crate::common::FutriixError::DatabaseError( + format!("Index not found: {}", index_name) + ))?; + + let value_str = value.to_string(); + Ok(index_map.get(&value_str, &guard) + .unwrap_or_default()) + } + + /// Создает новый документ в коллекции + pub fn create_document(&self, document: Vec) -> Result { + let guard = epoch::pin(); + + // Генерируем уникальный ID документа + let id = Uuid::new_v4().to_string(); + + // Добавляем поле _id к документу + let mut doc_value: Value = serde_json::from_slice(&document) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + + if let Value::Object(ref mut obj) = doc_value { + obj.insert("_id".to_string(), Value::String(id.clone())); + } + + let document_with_id = serde_json::to_vec(&doc_value) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + + // Добавляем временную метку + let document_with_timestamp = self.add_timestamp_to_document(document_with_id, "create")?; + + // Обновляем индексы + self.update_indexes(None, &document_with_timestamp, &id, &guard)?; + + // Сохраняем документ + self.document_store.insert(id.clone(), document_with_timestamp, &guard)?; + + // Логируем операцию + self.log_operation("create", &id); + + println!("Document created in collection '{}' with ID: {}", self.name, id); + Ok(id) + } + + /// Читает документ по ID + pub fn read_document(&self, id: &str) -> Result>> { + let guard = epoch::pin(); + + self.log_operation("read", id); + + Ok(self.document_store.get(id, &guard)) + } + + /// Обновляет существующий документ + pub fn update_document(&self, id: &str, document: Vec) -> Result<()> { + let guard = epoch::pin(); + + // Получаем старую версию документа + let old_document = self.document_store.get(id, &guard) + .ok_or_else(|| crate::common::FutriixError::DatabaseError( + format!("Document not found: {}", id) + ))?; + + // Добавляем временную метку + let document_with_timestamp = self.add_timestamp_to_document(document, "update")?; + + // Обновляем индексы + self.update_indexes(Some(&old_document), &document_with_timestamp, id, &guard)?; + + // Сохраняем обновленный документ + self.document_store.update(id, document_with_timestamp, &guard)?; + + // Логируем операцию + self.log_operation("update", id); + + println!("Document updated in collection '{}': {}", self.name, id); + Ok(()) + } + + /// Удаляет документ по ID + pub fn delete_document(&self, id: &str) -> Result<()> { + let guard = epoch::pin(); + + // Получаем старую версию документа для обновления индексов + let old_document = self.document_store.get(id, &guard) + .ok_or_else(|| crate::common::FutriixError::DatabaseError( + format!("Document not found: {}", id) + ))?; + + // Обновляем индексы (удаляем записи) + self.update_indexes(Some(&old_document), &[], id, &guard)?; + + // Удаляем документ + self.document_store.remove(id, &guard)?; + + // Логируем операцию + self.log_operation("delete", id); + + println!("Document deleted from collection '{}': {}", self.name, id); + Ok(()) + } + + /// Выполняет запрос документов с фильтром + pub fn query_documents(&self, filter: Vec) -> Result>> { + let guard = epoch::pin(); + + self.log_operation("query", "multiple"); + + let documents = self.document_store.get_all(&guard); + + // Применяем фильтр, если он указан + if !filter.is_empty() { + if let Ok(filter_value) = serde_json::from_slice::(&filter) { + return self.filter_documents(documents, &filter_value); + } + } + + Ok(documents) + } + + /// Фильтрует документы по заданному критерию + fn filter_documents(&self, documents: Vec>, filter: &Value) -> Result>> { + let mut filtered = Vec::new(); + + for document in documents { + if let Ok(doc_value) = serde_json::from_slice::(&document) { + if Self::document_matches_filter(&doc_value, filter) { + filtered.push(document); + } + } + } + + Ok(filtered) + } + + /// Проверяет, соответствует ли документ фильтру + fn document_matches_filter(document: &Value, filter: &Value) -> bool { + match filter { + Value::Object(filter_obj) => { + if let Value::Object(doc_obj) = document { + // Исправлено: добавлен ключевое слово `in` в цикле for + for (key, filter_value) in filter_obj { + match doc_obj.get(key) { + Some(doc_value) => { + if doc_value != filter_value { + return false; + } + } + None => return false, + } + } + true + } else { + false + } + } + _ => true, // Пустой фильтр всегда соответствует + } + } + + /// Возвращает имя коллекции + #[allow(dead_code)] + pub fn get_name(&self) -> &str { + &self.name + } + + /// Возвращает количество документов в коллекции + #[allow(dead_code)] + pub fn count_documents(&self) -> Result { + let guard = epoch::pin(); + Ok(self.document_store.len(&guard)) + } +} + +/// STM транзакция с Software Transactional Memory +/// Предоставляет атомарное выполнение нескольких команд +pub struct StmTransaction { + /// Команды в транзакции + commands: Vec, + /// Флаг активности транзакции + active: AtomicBool, +} + +// Ручная реализация Clone для StmTransaction +// Необходима из-за наличия AtomicBool, который не реализует Clone автоматически +impl Clone for StmTransaction { + fn clone(&self) -> Self { + Self { + commands: self.commands.clone(), + active: AtomicBool::new(self.active.load(Ordering::SeqCst)), + } + } +} + +impl StmTransaction { + /// Создает новую пустую транзакцию + fn new() -> Self { + Self { + commands: Vec::new(), + active: AtomicBool::new(true), + } + } + + /// Добавляет команду в транзакцию + fn add_command(&mut self, command: protocol::Command) -> Result<()> { + if !self.active.load(Ordering::Acquire) { + return Err(crate::common::FutriixError::DatabaseError( + "Transaction is not active".to_string() + )); + } + self.commands.push(command); + Ok(()) + } + + /// Выполняет коммит транзакции + fn commit(self, db: &Database) -> Result> { + if !self.active.load(Ordering::Acquire) { + return Err(crate::common::FutriixError::DatabaseError( + "Transaction is not active".to_string() + )); + } + + self.active.store(false, Ordering::Release); + + // Параллельное выполнение команд транзакции + let mut handles = Vec::new(); + + for cmd in self.commands { + let db_clone = db.clone(); + let handle = std::thread::spawn(move || { + db_clone.execute_command(cmd) + }); + handles.push(handle); + } + + // Собираем результаты выполнения команд + let mut results = Vec::new(); + for handle in handles { + match handle.join() { + Ok(Ok(response)) => results.push(response), + Ok(Err(e)) => { + return Err(crate::common::FutriixError::DatabaseError( + format!("Transaction failed: {}", e) + )); + } + Err(_) => { + return Err(crate::common::FutriixError::DatabaseError( + "Thread panicked during transaction commit".to_string() + )); + } + } + } + + Ok(results) + } + + /// Откатывает транзакцию + fn rollback(self) -> Result<()> { + if !self.active.load(Ordering::Acquire) { + return Err(crate::common::FutriixError::DatabaseError( + "Transaction is not active".to_string() + )); + } + + self.active.store(false, Ordering::Release); + Ok(()) + } +} + +/// Lock-Free база данных Futriix +/// Основной компонент системы, управляющий коллекциями и транзакциями +#[derive(Clone)] +pub struct Database { + /// Коллекции базы данных + collections: Arc>, + /// Хранимые процедуры + procedures: Arc>>, + /// Активные транзакции + transactions: Arc>, + /// Триггеры по коллекциям + triggers: Arc>>, +} + +impl Database { + /// Создает новую пустую базу данных + pub fn new() -> Self { + Self { + collections: Arc::new(LockFreeHashMap::new()), + procedures: Arc::new(LockFreeHashMap::new()), + transactions: Arc::new(LockFreeHashMap::new()), + triggers: Arc::new(LockFreeHashMap::new()), + } + } + + /// Получает коллекцию по имени (создает новую, если не существует) + pub fn get_collection(&self, name: &str) -> Collection { + let guard = epoch::pin(); + + if let Some(collection) = self.collections.get(&name.to_string(), &guard) { + return collection; + } + + let new_collection = Collection::new(name.to_string()); + self.collections.insert(name.to_string(), new_collection.clone(), &guard); + + new_collection + } + + /// Получает коллекцию по имени (без создания новой) + fn get_collection_opt(&self, name: &str) -> Option { + let guard = epoch::pin(); + self.collections.get(&name.to_string(), &guard) + } + + /// Выполняет команду в базе данных + pub fn execute_command(&self, command: protocol::Command) -> Result { + match command { + protocol::Command::Create { collection, document } => { + self.execute_triggers(&collection, TriggerEvent::BeforeCreate, &document)?; + + let coll = self.get_collection(&collection); + match coll.create_document(document.clone()) { + Ok(id) => { + self.execute_triggers(&collection, TriggerEvent::AfterCreate, &document)?; + Ok(protocol::Response::Success(id.into_bytes())) + } + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::Read { collection, id } => { + let coll = self.get_collection(&collection); + match coll.read_document(&id) { + Ok(Some(document)) => Ok(protocol::Response::Success(document)), + Ok(None) => Ok(protocol::Response::Error(format!("Document not found: {}", id))), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::Update { collection, id, document } => { + let old_document = { + let coll = self.get_collection(&collection); + coll.read_document(&id).ok().flatten() + }; + + self.execute_triggers(&collection, TriggerEvent::BeforeUpdate, &document)?; + + let coll = self.get_collection(&collection); + match coll.update_document(&id, document.clone()) { + Ok(_) => { + self.execute_triggers(&collection, TriggerEvent::AfterUpdate, &document)?; + Ok(protocol::Response::Success(vec![])) + } + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::Delete { collection, id } => { + let old_document = { + let coll = self.get_collection(&collection); + coll.read_document(&id).ok().flatten() + }; + + if let Some(doc) = &old_document { + self.execute_triggers(&collection, TriggerEvent::BeforeDelete, doc)?; + } + + let coll = self.get_collection(&collection); + match coll.delete_document(&id) { + Ok(_) => { + if let Some(doc) = &old_document { + self.execute_triggers(&collection, TriggerEvent::AfterDelete, doc)?; + } + Ok(protocol::Response::Success(vec![])) + } + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::Query { collection, filter } => { + let coll = self.get_collection(&collection); + match coll.query_documents(filter) { + Ok(documents) => { + let json_docs: Vec = documents.into_iter() + .filter_map(|doc| serde_json::from_slice(&doc).ok()) + .collect(); + + match serde_json::to_vec(&json_docs) { + Ok(data) => Ok(protocol::Response::Success(data)), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::CreateProcedure { name, code } => { + let guard = epoch::pin(); + self.procedures.insert(name, code, &guard); + Ok(protocol::Response::Success(vec![])) + } + protocol::Command::CallProcedure { name } => { + let guard = epoch::pin(); + if let Some(code) = self.procedures.get(&name, &guard) { + Ok(protocol::Response::Success(format!("Procedure {} executed", name).into_bytes())) + } else { + Ok(protocol::Response::Error(format!("Procedure not found: {}", name))) + } + } + protocol::Command::BeginTransaction { transaction_id } => { + let guard = epoch::pin(); + if self.transactions.get(&transaction_id, &guard).is_some() { + return Ok(protocol::Response::Error("Transaction already exists".to_string())); + } + + let transaction = StmTransaction::new(); + self.transactions.insert(transaction_id, transaction, &guard); + Ok(protocol::Response::Success(vec![])) + } + protocol::Command::CommitTransaction { transaction_id } => { + let guard = epoch::pin(); + if let Some(transaction) = self.transactions.remove(&transaction_id, &guard) { + match transaction.commit(self) { + Ok(_) => Ok(protocol::Response::Success(vec![])), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } else { + Ok(protocol::Response::Error("Transaction not found".to_string())) + } + } + protocol::Command::RollbackTransaction { transaction_id } => { + let guard = epoch::pin(); + if let Some(transaction) = self.transactions.remove(&transaction_id, &guard) { + match transaction.rollback() { + Ok(_) => Ok(protocol::Response::Success(vec![])), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } else { + Ok(protocol::Response::Error("Transaction not found".to_string())) + } + } + protocol::Command::CreateIndex { collection, index } => { + let coll = self.get_collection(&collection); + match coll.create_index(index) { + Ok(_) => Ok(protocol::Response::Success(vec![])), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::QueryByIndex { collection, index_name, value } => { + let coll = self.get_collection(&collection); + let value: Value = serde_json::from_slice(&value) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + + match coll.query_by_index(&index_name, &value) { + Ok(document_ids) => { + let result = serde_json::to_vec(&document_ids) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + Ok(protocol::Response::Success(result)) + } + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + _ => { + Ok(protocol::Response::Error("Command not implemented".to_string())) + } + } + } + + /// Выполняет триггеры для указанной коллекции и события + fn execute_triggers(&self, collection: &str, event: TriggerEvent, document: &[u8]) -> Result<()> { + let guard = epoch::pin(); + if let Some(triggers) = self.triggers.get(&collection.to_string(), &guard) { + for trigger in triggers { + if trigger.event == event { + println!("Executing trigger '{}' for collection '{}' on {:?}", + trigger.name, collection, event); + } + } + } + Ok(()) + } + + /// Добавляет триггер в базу данных + pub fn add_trigger(&self, trigger: Trigger) -> Result<()> { + let guard = epoch::pin(); + + let current_triggers = self.triggers.get(&trigger.collection, &guard) + .unwrap_or_else(Vec::new); + + // Проверяем уникальность имени триггера + for existing in ¤t_triggers { + if existing.name == trigger.name { + return Err(crate::common::FutriixError::DatabaseError( + format!("Trigger '{}' already exists for collection '{}'", + trigger.name, trigger.collection) + )); + } + } + + let mut new_triggers = current_triggers.clone(); + new_triggers.push(trigger.clone()); + + self.triggers.insert(trigger.collection.clone(), new_triggers, &guard); + Ok(()) + } + + /// Возвращает статистику базы данных + #[allow(dead_code)] + pub fn get_stats(&self) -> Result> { + let guard = epoch::pin(); + let mut stats = std::collections::HashMap::new(); + + let collections_count = { + let guard2 = epoch::pin(); + self.collections.len(&guard2) + }; + + let procedures_count = { + let guard2 = epoch::pin(); + self.procedures.len(&guard2) + }; + + let transactions_count = { + let guard2 = epoch::pin(); + self.transactions.len(&guard2) + }; + + let triggers_count = { + let guard2 = epoch::pin(); + self.triggers.len(&guard2) + }; + + stats.insert("collections".to_string(), collections_count); + stats.insert("procedures".to_string(), procedures_count); + stats.insert("active_transactions".to_string(), transactions_count); + stats.insert("triggers".to_string(), triggers_count); + + let mut total_documents = 0; + let collections: Vec<_> = { + let guard2 = epoch::pin(); + self.collections.iter(&guard2) + .into_iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect() + }; + + for (_, collection) in collections { + total_documents += collection.count_documents().unwrap_or(0); + } + + stats.insert("total_documents".to_string(), total_documents); + + Ok(stats) + } + + /// Создает бэкап базы данных + pub fn create_backup(&self) -> Result>>> { + let guard = epoch::pin(); + let mut backup = std::collections::HashMap::new(); + + let collections: Vec<_> = self.collections.iter(&guard) + .into_iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + + for (name, collection) in collections { + let documents = collection.query_documents(vec![])?; + + let mut collection_backup = std::collections::HashMap::new(); + for document in documents { + if let Ok(doc_value) = serde_json::from_slice::(&document) { + if let Some(id) = doc_value.get("_id").and_then(|v| v.as_str()) { + collection_backup.insert(id.to_string(), document); + } + } + } + + backup.insert(name, collection_backup); + } + + Ok(backup) + } + + /// Восстанавливает базу данных из бэкапа + pub fn restore_from_backup(&self, backup: std::collections::HashMap>>) -> Result<()> { + let guard = epoch::pin(); + + // Очищаем коллекции через lock-free операцию + loop { + let current = self.collections.inner.load(Ordering::Acquire, &guard); + let new_map = HashMap::new(); + let new_ptr = Owned::new(new_map); + + if self.collections.inner.compare_exchange(current, new_ptr, Ordering::Release, Ordering::Relaxed, &guard).is_ok() { + break; + } + } + + // Восстанавливаем документы из бэкапа + for (collection_name, documents) in backup { + let collection = self.get_collection(&collection_name); + + for (id, document) in documents { + let command = protocol::Command::Create { + collection: collection_name.clone(), + document, + }; + + if let Err(e) = self.execute_command(command) { + eprintln!("Failed to restore document {}: {}", id, e); + } + } + } + + Ok(()) + } +}