From def8a9580c5cc2bb3745f99f728515f215e49bbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Mon, 1 Dec 2025 18:04:55 +0000 Subject: [PATCH] Fixed index creation bug --- src/server/database.rs | 640 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 640 insertions(+) create mode 100644 src/server/database.rs diff --git a/src/server/database.rs b/src/server/database.rs new file mode 100644 index 0000000..0965f69 --- /dev/null +++ b/src/server/database.rs @@ -0,0 +1,640 @@ +// src/server/database.rs +//! Wait-Free документо-ориентированная база данных Futriix +//! +//! Реализует wait-free доступ к данным с использованием атомарных +//! ссылок и lock-free структур данных для максимальной производительности. +//! Автоматически добавляет временные метки ко всем операциям с документами. + +#![allow(unused_imports)] +#![allow(dead_code)] + +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::RwLock; +use serde::{Serialize, Deserialize}; +use serde_json::Value; +use uuid::Uuid; +use dashmap::DashMap; + +use crate::common::Result; +use crate::common::protocol; + +/// Триггеры для коллекций +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum TriggerEvent { + BeforeCreate, + AfterCreate, + BeforeUpdate, + AfterUpdate, + BeforeDelete, + AfterDelete, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Trigger { + pub name: String, + pub event: TriggerEvent, + pub collection: String, + pub lua_code: String, +} + +/// Типы индексов +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum IndexType { + Primary, + Secondary, +} + +/// Структура индекса +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Index { + pub name: String, + pub index_type: IndexType, + pub field: String, + pub unique: bool, +} + +/// Wait-Free коллекция документов +#[derive(Clone)] +pub struct Collection { + name: String, + documents: Arc>>>, + sequence: Arc, + triggers: Arc>>, + indexes: Arc>>, + index_data: Arc>>>, +} + +impl Collection { + /// Создание новой wait-free коллекции + pub fn new(name: String) -> Self { + Self { + name, + documents: Arc::new(RwLock::new(std::collections::HashMap::new())), + sequence: Arc::new(AtomicU64::new(0)), + triggers: Arc::new(RwLock::new(Vec::new())), + indexes: Arc::new(RwLock::new(std::collections::HashMap::new())), + index_data: Arc::new(DashMap::new()), + } + } + + /// Функция для логирования операций с временной меткой + fn log_operation(&self, operation: &str, id: &str) { + use std::fs::OpenOptions; + use std::io::Write; + + // ИСПРАВЛЕНИЕ: Используем системное время с миллисекундами + let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(); + let log_message = format!("[{}] Collection: '{}', Operation: '{}', Document ID: '{}'\n", + timestamp, self.name, operation, id); + + // Логируем в файл + if let Ok(mut file) = OpenOptions::new() + .create(true) + .append(true) + .open("futriix.log") + { + let _ = file.write_all(log_message.as_bytes()); + } + + // Также выводим в консоль для отладки + println!("{}", log_message.trim()); + } + + /// Добавление временной метки к документу + fn add_timestamp_to_document(&self, document: Vec, operation: &str) -> Result> { + // ИСПРАВЛЕНИЕ: Используем системное время с миллисекундами + let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(); + + // Парсим документ как JSON + let mut doc_value: Value = serde_json::from_slice(&document) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + + // Добавляем временные метки + if let Value::Object(ref mut obj) = doc_value { + obj.insert("_timestamp".to_string(), Value::String(timestamp.clone())); + obj.insert("_operation".to_string(), Value::String(operation.to_string())); + } + + // Сериализуем обратно в байты + serde_json::to_vec(&doc_value) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string())) + } + + /// Добавление триггера + pub fn add_trigger(&self, trigger: Trigger) -> Result<()> { + let mut triggers = self.triggers.write() + .map_err(|e| crate::common::FutriixError::IoError( + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + ))?; + + triggers.push(trigger); + Ok(()) + } + + /// Получение триггеров для события + #[allow(dead_code)] + pub fn get_triggers_for_event(&self, event: TriggerEvent) -> Result> { + let triggers = self.triggers.read() + .map_err(|e| crate::common::FutriixError::IoError( + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + ))?; + + Ok(triggers.iter() + .filter(|t| t.event == event) + .cloned() + .collect()) + } + + /// Создание индекса + pub fn create_index(&self, index: Index) -> Result<()> { + let mut indexes = self.indexes.write() + .map_err(|e| crate::common::FutriixError::IoError( + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + ))?; + + if indexes.contains_key(&index.name) { + return Err(crate::common::FutriixError::DatabaseError( + format!("Index already exists: {}", index.name) + )); + } + + // Создаем структуру для хранения данных индекса + self.index_data.insert(index.name.clone(), std::collections::HashMap::new()); + + let index_clone = index.clone(); + indexes.insert(index.name.clone(), index); + + // Перестраиваем индекс для существующих документов + self.rebuild_index(&index_clone.name)?; + + Ok(()) + } + + /// Перестроение индекса + fn rebuild_index(&self, index_name: &str) -> Result<()> { + let indexes = self.indexes.read() + .map_err(|e| crate::common::FutriixError::IoError( + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + ))?; + + let index = indexes.get(index_name) + .ok_or_else(|| crate::common::FutriixError::DatabaseError( + format!("Index not found: {}", index_name) + ))?; + + let documents = self.documents.read() + .map_err(|e| crate::common::FutriixError::IoError( + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + ))?; + + let mut index_map = std::collections::HashMap::new(); + + for (id, document_bytes) in documents.iter() { + if let Ok(document) = serde_json::from_slice::(document_bytes) { + if let Some(field_value) = document.get(&index.field) { + // Конвертируем значение в строку для использования в HashMap + let value_str = field_value.to_string(); + let entry = index_map.entry(value_str).or_insert_with(Vec::new); + entry.push(id.clone()); + + // Проверка уникальности для уникальных индексов + if index.unique && entry.len() > 1 { + return Err(crate::common::FutriixError::DatabaseError( + format!("Duplicate value {} for unique index {}", field_value, index_name) + )); + } + } + } + } + + self.index_data.insert(index_name.to_string(), index_map); + Ok(()) + } + + /// Обновление индекса при изменении документа + fn update_indexes(&self, old_document: Option<&[u8]>, new_document: &[u8], document_id: &str) -> Result<()> { + let indexes = self.indexes.read() + .map_err(|e| crate::common::FutriixError::IoError( + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + ))?; + + let new_doc_value: Value = serde_json::from_slice(new_document) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + + let old_doc_value: Option = old_document + .and_then(|doc| serde_json::from_slice(doc).ok()); + + for (index_name, index) in indexes.iter() { + if let Some(mut index_map) = self.index_data.get_mut(index_name) { + // Удаляем старые значения из индекса + if let Some(old_doc) = &old_doc_value { + if let Some(old_value) = old_doc.get(&index.field) { + let old_value_str = old_value.to_string(); + if let Some(entries) = index_map.get_mut(&old_value_str) { + entries.retain(|id| id != document_id); + if entries.is_empty() { + index_map.remove(&old_value_str); + } + } + } + } + + // Добавляем новые значения в индекс + if let Some(new_value) = new_doc_value.get(&index.field) { + let new_value_str = new_value.to_string(); + let entries = index_map.entry(new_value_str).or_insert_with(Vec::new); + + // Проверка уникальности + if index.unique && !entries.is_empty() && entries[0] != document_id { + return Err(crate::common::FutriixError::DatabaseError( + format!("Duplicate value {} for unique index {}", new_value, index_name) + )); + } + + if !entries.contains(&document_id.to_string()) { + entries.push(document_id.to_string()); + } + } + } + } + + Ok(()) + } + + /// Поиск по индексу + pub fn query_by_index(&self, index_name: &str, value: &Value) -> Result> { + let index_map = self.index_data.get(index_name) + .ok_or_else(|| crate::common::FutriixError::DatabaseError( + format!("Index not found: {}", index_name) + ))?; + + let value_str = value.to_string(); + Ok(index_map.get(&value_str).cloned().unwrap_or_default()) + } + + /// Wait-Free создание документа с временной меткой + pub fn create_document(&self, document: Vec) -> Result { + let id = Uuid::new_v4().to_string(); + let seq = self.sequence.fetch_add(1, Ordering::SeqCst); + + // Добавляем временную метку к документу + let document_with_timestamp = self.add_timestamp_to_document(document, "create")?; + + // ИСПРАВЛЕНИЕ: Сначала проверяем индексы, потом вставляем документ + // Это предотвращает ситуацию, когда документ вставлен, но индексы не обновлены + self.update_indexes(None, &document_with_timestamp, &id)?; + + let mut documents = self.documents.write() + .map_err(|e| crate::common::FutriixError::IoError( + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + ))?; + + // Проверяем, не существует ли уже документ с таким ID + if documents.contains_key(&id) { + return Err(crate::common::FutriixError::DatabaseError( + format!("Document with ID {} already exists", id) + )); + } + + documents.insert(id.clone(), document_with_timestamp); + + // Логируем операцию + self.log_operation("create", &id); + + println!("Document created in collection '{}' with ID: {} (seq: {})", self.name, id, seq); + Ok(id) + } + + /// Wait-Free чтение документа + pub fn read_document(&self, id: &str) -> Result>> { + let documents = self.documents.read() + .map_err(|e| crate::common::FutriixError::IoError( + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + ))?; + + // Логируем операцию чтения + self.log_operation("read", id); + + Ok(documents.get(id).cloned()) + } + + /// Wait-Free обновление документа с временной меткой + pub fn update_document(&self, id: &str, document: Vec) -> Result<()> { + let seq = self.sequence.fetch_add(1, Ordering::SeqCst); + + // Добавляем временную метку к документу + let document_with_timestamp = self.add_timestamp_to_document(document, "update")?; + + let mut documents = self.documents.write() + .map_err(|e| crate::common::FutriixError::IoError( + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + ))?; + + if let Some(old_document) = documents.get(id) { + // Обновляем индексы + self.update_indexes(Some(old_document), &document_with_timestamp, id)?; + + documents.insert(id.to_string(), document_with_timestamp); + + // Логируем операцию + self.log_operation("update", id); + + println!("Document updated in collection '{}': {} (seq: {})", self.name, id, seq); + Ok(()) + } else { + Err(crate::common::FutriixError::DatabaseError( + format!("Document not found: {}", id) + )) + } + } + + /// Wait-Free удаление документа с временной меткой + pub fn delete_document(&self, id: &str) -> Result<()> { + let seq = self.sequence.fetch_add(1, Ordering::SeqCst); + + let mut documents = self.documents.write() + .map_err(|e| crate::common::FutriixError::IoError( + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + ))?; + + if let Some(old_document) = documents.get(id) { + // Удаляем из индексов + self.update_indexes(Some(old_document), &[], id)?; + + documents.remove(id); + + // Логируем операцию + self.log_operation("delete", id); + + println!("Document deleted from collection '{}': {} (seq: {})", self.name, id, seq); + Ok(()) + } else { + Err(crate::common::FutriixError::DatabaseError( + format!("Document not found: {}", id) + )) + } + } + + /// Wait-Free запрос документов + pub fn query_documents(&self, _filter: Vec) -> Result>> { + let documents = self.documents.read() + .map_err(|e| crate::common::FutriixError::IoError( + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + ))?; + + // Логируем операцию запроса + self.log_operation("query", "multiple"); + + // TODO: Реализовать wait-free фильтрацию на основе filter + let documents: Vec> = documents.values().cloned().collect(); + + Ok(documents) + } + + /// Получение имени коллекции (wait-free) + #[allow(dead_code)] + pub fn get_name(&self) -> &str { + &self.name + } + + /// Получение количества документов (wait-free) + #[allow(dead_code)] + pub fn count_documents(&self) -> Result { + let documents = self.documents.read() + .map_err(|e| crate::common::FutriixError::IoError( + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + ))?; + + Ok(documents.len()) + } +} + +/// Wait-Free база данных +#[derive(Clone)] +pub struct Database { + collections: Arc>, + procedures: Arc>>, + transactions: Arc>>, +} + +impl Database { + /// Создание новой wait-free базы данных + pub fn new() -> Self { + Self { + collections: Arc::new(DashMap::new()), + procedures: Arc::new(DashMap::new()), + transactions: Arc::new(DashMap::new()), + } + } + + /// Wait-Free получение или создание коллекции + pub fn get_collection(&self, name: &str) -> Collection { + if let Some(collection) = self.collections.get(name) { + return collection.clone(); + } + + // Создаем новую коллекцию wait-free способом + let new_collection = Collection::new(name.to_string()); + self.collections.insert(name.to_string(), new_collection.clone()); + + new_collection + } + + /// Wait-Free выполнение команды + pub fn execute_command(&self, command: protocol::Command) -> Result { + match command { + protocol::Command::Create { collection, document } => { + let coll = self.get_collection(&collection); + match coll.create_document(document) { + Ok(id) => Ok(protocol::Response::Success(id.into_bytes())), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::Read { collection, id } => { + let coll = self.get_collection(&collection); + match coll.read_document(&id) { + Ok(Some(document)) => Ok(protocol::Response::Success(document)), + Ok(None) => Ok(protocol::Response::Error(format!("Document not found: {}", id))), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::Update { collection, id, document } => { + let coll = self.get_collection(&collection); + match coll.update_document(&id, document) { + Ok(_) => Ok(protocol::Response::Success(vec![])), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::Delete { collection, id } => { + let coll = self.get_collection(&collection); + match coll.delete_document(&id) { + Ok(_) => Ok(protocol::Response::Success(vec![])), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::Query { collection, filter } => { + let coll = self.get_collection(&collection); + match coll.query_documents(filter) { + Ok(documents) => { + let json_docs: Vec = documents.into_iter() + .filter_map(|doc| serde_json::from_slice(&doc).ok()) + .collect(); + + match serde_json::to_vec(&json_docs) { + Ok(data) => Ok(protocol::Response::Success(data)), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::CreateProcedure { name, code } => { + // ИСПРАВЛЕНИЕ: Сохраняем процедуру в procedures map вместо коллекции + self.procedures.insert(name, code); + Ok(protocol::Response::Success(vec![])) + } + protocol::Command::CallProcedure { name } => { + if let Some(code) = self.procedures.get(&name) { + // TODO: Выполнить Lua код процедуры + Ok(protocol::Response::Success(format!("Procedure {} executed", name).into_bytes())) + } else { + Ok(protocol::Response::Error(format!("Procedure not found: {}", name))) + } + } + protocol::Command::BeginTransaction { transaction_id } => { + if self.transactions.contains_key(&transaction_id) { + return Ok(protocol::Response::Error("Transaction already exists".to_string())); + } + + self.transactions.insert(transaction_id, Vec::new()); + Ok(protocol::Response::Success(vec![])) + } + protocol::Command::CommitTransaction { transaction_id } => { + if let Some((_, commands)) = self.transactions.remove(&transaction_id) { + // ИСПРАВЛЕНИЕ: Выполняем все команды транзакции в правильном порядке + let mut results = Vec::new(); + for cmd in commands { + match self.execute_command(cmd) { + Ok(response) => results.push(response), + Err(e) => { + // Если одна команда не выполнилась, откатываем всю транзакцию + return Ok(protocol::Response::Error(format!("Transaction failed: {}", e))); + } + } + } + Ok(protocol::Response::Success(vec![])) + } else { + Ok(protocol::Response::Error("Transaction not found".to_string())) + } + } + protocol::Command::RollbackTransaction { transaction_id } => { + if self.transactions.remove(&transaction_id).is_some() { + Ok(protocol::Response::Success(vec![])) + } else { + Ok(protocol::Response::Error("Transaction not found".to_string())) + } + } + protocol::Command::CreateIndex { collection, index } => { + let coll = self.get_collection(&collection); + match coll.create_index(index) { + Ok(_) => Ok(protocol::Response::Success(vec![])), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::QueryByIndex { collection, index_name, value } => { + let coll = self.get_collection(&collection); + let value: Value = serde_json::from_slice(&value) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + + match coll.query_by_index(&index_name, &value) { + Ok(document_ids) => { + let result = serde_json::to_vec(&document_ids) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + Ok(protocol::Response::Success(result)) + } + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + // Обработка остальных команд... + _ => { + // TODO: Реализовать обработку остальных команд + Ok(protocol::Response::Error("Command not implemented".to_string())) + } + } + } + + /// Wait-Free получение статистики базы данных + #[allow(dead_code)] + pub fn get_stats(&self) -> Result> { + let mut stats = std::collections::HashMap::new(); + stats.insert("collections".to_string(), self.collections.len()); + stats.insert("procedures".to_string(), self.procedures.len()); + stats.insert("active_transactions".to_string(), self.transactions.len()); + + // Подсчет документов во всех коллекциях + let total_documents: usize = self.collections.iter() + .map(|entry| entry.value().count_documents().unwrap_or(0)) + .sum(); + + stats.insert("total_documents".to_string(), total_documents); + + Ok(stats) + } + + /// Создание бэкапа базы данных + pub fn create_backup(&self) -> Result>>> { + let mut backup = std::collections::HashMap::new(); + + for entry in self.collections.iter() { + let name = entry.key().clone(); + let collection = entry.value(); + + let documents = collection.documents.read() + .map_err(|e| crate::common::FutriixError::IoError( + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + ))?; + + let mut collection_backup = std::collections::HashMap::new(); + for (id, document) in documents.iter() { + collection_backup.insert(id.clone(), document.clone()); + } + + backup.insert(name, collection_backup); + } + + Ok(backup) + } + + /// Восстановление из бэкапа + pub fn restore_from_backup(&self, backup: std::collections::HashMap>>) -> Result<()> { + // Очищаем существующие коллекции + self.collections.clear(); + + // Восстанавливаем данные из бэкапа + for (collection_name, documents) in backup { + let collection = Collection::new(collection_name.clone()); + + { + let mut collection_docs = collection.documents.write() + .map_err(|e| crate::common::FutriixError::IoError( + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + ))?; + + for (id, document) in documents { + collection_docs.insert(id, document); + } + } + + self.collections.insert(collection_name, collection); + } + + Ok(()) + } + + /// Добавление триггера к коллекции + pub fn add_trigger(&self, trigger: Trigger) -> Result<()> { + let collection = self.get_collection(&trigger.collection); + collection.add_trigger(trigger) + } +}