diff --git a/src/core/database.rs b/src/core/database.rs new file mode 100644 index 0000000..b444e82 --- /dev/null +++ b/src/core/database.rs @@ -0,0 +1,674 @@ +//! Модуль управления базами данных flusql +//! +//! Этот модуль реализует функциональность создания, открытия, +//! управления и удаления баз данных. Каждая база данных содержит +//! коллекцию таблиц и управляет их жизненным циклом. +//! +//! Основные возможности: +//! - Создание новых баз данных +//! - Открытие существующих баз данных +//! - Создание и управление таблицами +//! - Удаление баз данных +//! - Сохранение и загрузка метаданных +//! - Управление транзакциями (базовое) +//! - Управление индексами +//! - Управление триггерами + +use std::collections::HashMap; +use std::fs; +use std::path::Path; +use crate::core::table::Table; +use crate::core::column_family::ColumnFamilyStorage; +use crate::utils::config::Config; +use thiserror::Error; +use serde_json; +use dashmap::DashMap; +use arc_swap::ArcSwap; +use std::sync::atomic::{AtomicBool, Ordering}; +use tokio::task::JoinSet; + +/// База данных flusql +/// +/// Представляет собой контейнер для таблиц с метаданными. +/// Каждая база данных хранится в отдельной директории на диске. +/// +/// АРХИТЕКТУРНЫЕ ХАРАКТЕРИСТИКИ: +/// - Файловая архитектура: каждая база данных в отдельной директории +/// - Wait-free подход: используется атомарная булева переменная для управления состоянием +/// - Отсутствие блокировок: не используются RwLock, Mutex или другие блокирующие примитивы +/// - Асинхронные операции: некоторые операции выполняются в фоновых потоках +/// - Колоночное хранение: используется семейство столбцов для wait-free доступа +pub struct Database { + /// Имя базы данных + name: String, + + /// Таблицы в базе данных (имя -> таблица) + tables: DashMap, + + /// Колоночное хранилище + column_families: ColumnFamilyStorage, + + /// Конфигурация базы данных + config: Config, + + /// Путь к директории с данными базы + data_dir: String, + + /// Флаг активности базы данных (wait-free подход) + /// Используется атомарная операция для проверки состояния без блокировок + is_active: AtomicBool, + + /// Индексы базы данных + indexes: DashMap>, + + /// Триггеры базы данных + triggers: DashMap, + + /// Пулы для параллельной обработки + query_pools: ArcSwap>, +} + +/// Триггер базы данных +#[derive(Debug, Clone)] +struct Trigger { + name: String, + table: String, + timing: crate::parser::sql::TriggerTiming, + event: crate::parser::sql::TriggerEvent, + action: String, +} + +impl Database { + /// Создание новой базы данных + /// + /// Создает директорию для базы данных и инициализирует структуру. + /// Использует wait-free подход: не блокирует другие операции. + /// + /// # Аргументы + /// * `name` - имя создаваемой базы данных + /// * `config` - конфигурация СУБД + /// + /// # Возвращает + /// * `Result` - новая база данных или ошибка + /// + /// # Пример + /// ``` + /// use flusql::Database; + /// use flusql::Config; + /// + /// let config = Config::default(); + /// let db = Database::create("mydb", &config).unwrap(); + /// ``` + pub fn create(name: &str, config: &Config) -> Result { + let data_dir = config.get_data_path(name); + + // Wait-free проверка существования базы данных + // Используем атомарные операции файловой системы + if Path::new(&data_dir).exists() { + return Err(DatabaseError::AlreadyExists(name.to_string())); + } + + // Создаем директорию для базы данных + // Эта операция блокирующая, но выполняется только при создании + fs::create_dir_all(&data_dir) + .map_err(|e| DatabaseError::IoError(e))?; + + // Создаем файл базы данных (basedb.db вместо mydb.db) + let db_file_path = format!("{}/basedb.db", data_dir); + + // Создаем информационное содержимое для файла базы данных + let db_info = format!( + "flusql database: {}\ncreated: {}\nversion: 0.5.0\nstorage_format: csv\n", + name, + chrono::Local::now().to_rfc3339() + ); + + fs::write(&db_file_path, db_info).map_err(|e| DatabaseError::IoError(e))?; + + // Логируем создание базы данных + crate::utils::logger::log_info(&format!("Database '{}' created at '{}'", name, db_file_path)); + + Ok(Self { + name: name.to_string(), + tables: DashMap::new(), + column_families: ColumnFamilyStorage::new(), + config: config.clone(), + data_dir, + is_active: AtomicBool::new(true), + indexes: DashMap::new(), + triggers: DashMap::new(), + query_pools: ArcSwap::new(std::sync::Arc::new(JoinSet::new())), + }) + } + + /// Открытие существующей базы данных + /// + /// Загружает базу данных из директории, включая все таблицы и их данные. + /// Использует wait-free подход для проверки доступности. + /// + /// # Аргументы + /// * `name` - имя открываемой базы данных + /// * `config` - конфигурация СУБД + /// + /// # Возвращает + /// * `Result` - открытая база данных или ошибка + /// + /// # Пример + /// ``` + /// let db = Database::open("mydb", &config).unwrap(); + /// ``` + pub fn open(name: &str, config: &Config) -> Result { + let data_dir = config.get_data_path(name); + + // Wait-free проверка существования базы данных + if !Path::new(&data_dir).exists() { + return Err(DatabaseError::NotFound(name.to_string())); + } + + // Проверяем наличие файла базы данных (basedb.db вместо name.db) + let db_file_path = format!("{}/basedb.db", data_dir); + if !Path::new(&db_file_path).exists() { + // Для обратной совместимости проверяем старый формат + let old_db_file_path = format!("{}/{}.db", data_dir, name); + if !Path::new(&old_db_file_path).exists() { + return Err(DatabaseError::NotFound(name.to_string())); + } + } + + // Загрузка метаданных базы данных + let meta_path = format!("{}/meta.json", data_dir); + let tables = if Path::new(&meta_path).exists() { + // Читаем файл метаданных + let meta_content = fs::read_to_string(&meta_path) + .map_err(|e| DatabaseError::IoError(e))?; + + // Десериализуем список имен таблиц + let table_names: Vec = serde_json::from_str(&meta_content) + .map_err(|e| DatabaseError::ParseError(e))?; + + // Загружаем каждую таблицу + let tables_map = DashMap::new(); + + for table_name in table_names { + if let Ok(table) = Table::load(&data_dir, &table_name) { + tables_map.insert(table_name.clone(), table); + } + } + + tables_map + } else { + // Если файл метаданных не существует, создаем пустую базу + DashMap::new() + }; + + // Логируем открытие базы данных + crate::utils::logger::log_info(&format!("Database '{}' opened from '{}'", name, db_file_path)); + + Ok(Self { + name: name.to_string(), + tables, + column_families: ColumnFamilyStorage::new(), + config: config.clone(), + data_dir, + is_active: AtomicBool::new(true), + indexes: DashMap::new(), + triggers: DashMap::new(), + query_pools: ArcSwap::new(std::sync::Arc::new(JoinSet::new())), + }) + } + + /// Создание таблицы в базе данных + /// + /// # Аргументы + /// * `name` - имя создаваемой таблицы + /// * `schema` - схема таблицы + /// + /// # Возвращает + /// * `Result<(), DatabaseError>` - успех или ошибка создания + /// + /// # Пример + /// ``` + /// use flusql::core::table::{TableSchema, ColumnSchema, DataType}; + /// + /// let schema = TableSchema { + /// columns: vec![ + /// ColumnSchema { + /// name: "id".to_string(), + /// data_type: DataType::Integer, + /// nullable: false, + /// unique: true, + /// }, + /// ], + /// primary_key: Some("id".to_string()), + /// indexes: vec![], + /// foreign_keys: vec![], + /// checks: vec![], + /// }; + /// + /// db.create_table("users", schema).unwrap(); + /// ``` + pub fn create_table(&self, name: &str, schema: crate::core::table::TableSchema) + -> Result<(), DatabaseError> { + + // Wait-free проверка существования таблицы + if self.tables.contains_key(name) { + return Err(DatabaseError::TableExists(name.to_string())); + } + + // Создаем новую таблицу + let table = Table::new(name, schema, &self.data_dir); + + // Также создаем колоночное семейство + let _ = self.column_families.create_family(name, table.get_schema()); + + // Сохраняем таблицу на диск + table.save() + .map_err(|e| DatabaseError::TableError(e))?; + + // Добавляем таблицу в коллекцию (lock-free вставка) + self.tables.insert(name.to_string(), table); + + // Сохраняем обновленные метаданные + self.save_metadata()?; + + // Логируем создание таблицы + crate::utils::logger::log_info(&format!("Table '{}' created in database '{}'", name, self.name)); + + Ok(()) + } + + /// Изменение таблицы + pub fn alter_table(&self, name: &str, operation: crate::parser::sql::AlterOperation) + -> Result<(), DatabaseError> { + if let Some(table) = self.tables.get_mut(name) { + // Упрощенная реализация - в реальной системе здесь нужно + // перестраивать таблицу и обновлять данные + + // Создаем копию таблицы для сохранения + let table_data = table.get_schema(); // Получаем только схему для простоты + + match operation { + crate::parser::sql::AlterOperation::AddColumn(column_def) => { + // Добавление столбца + let column_schema = crate::core::table::ColumnSchema { + name: column_def.name, + data_type: match column_def.data_type { + crate::parser::sql::DataType::Integer => crate::core::table::DataType::Integer, + crate::parser::sql::DataType::Text(_) => crate::core::table::DataType::Text, + crate::parser::sql::DataType::Boolean => crate::core::table::DataType::Boolean, + crate::parser::sql::DataType::Float => crate::core::table::DataType::Float, + crate::parser::sql::DataType::Numeric(_) => crate::core::table::DataType::Integer, // Упрощение + crate::parser::sql::DataType::Timestamp => crate::core::table::DataType::Text, // Упрощение + crate::parser::sql::DataType::Array(_) => crate::core::table::DataType::Text, // Упрощение + crate::parser::sql::DataType::Json => crate::core::table::DataType::Text, // Упрощение + crate::parser::sql::DataType::Jsonb => crate::core::table::DataType::Text, // Упрощение + crate::parser::sql::DataType::Uuid => crate::core::table::DataType::Text, // Упрощение + crate::parser::sql::DataType::Bytea => crate::core::table::DataType::Text, // Упрощение + }, + nullable: column_def.nullable, + unique: column_def.unique, + }; + + // В реальной реализации здесь нужно добавлять столбец к схеме таблицы + // и обновлять существующие записи + let _ = column_schema; + } + crate::parser::sql::AlterOperation::DropColumn(column_name) => { + // Удаление столбца + // В реальной реализации здесь нужно удалять столбец из схемы + // и удалять данные этого столбца + let _ = column_name; + } + crate::parser::sql::AlterOperation::AlterColumnType { name: column_name, data_type, using } => { + // Изменение типа столбца + // В реальной реализации здесь нужно изменять схему столбца + // и конвертировать существующие данные + let _ = (column_name, data_type, using); + } + crate::parser::sql::AlterOperation::SetNotNull(column_name) => { + // Установка NOT NULL + let _ = column_name; + } + crate::parser::sql::AlterOperation::DropNotNull(column_name) => { + // Удаление NOT NULL + let _ = column_name; + } + crate::parser::sql::AlterOperation::SetDefault { name: column_name, default } => { + // Установка значения по умолчанию + let _ = (column_name, default); + } + crate::parser::sql::AlterOperation::DropDefault(column_name) => { + // Удаление значения по умолчанию + let _ = column_name; + } + crate::parser::sql::AlterOperation::RenameColumn { old_name, new_name } => { + // Переименование столбца + let _ = (old_name, new_name); + } + crate::parser::sql::AlterOperation::RenameTable { new_name } => { + // Переименование таблицы + let _ = new_name; + } + _ => { + // Другие операции пока не поддерживаются + return Err(DatabaseError::TransactionError( + "Operation not supported".to_string() + )); + } + } + + // Сохраняем изменения в фоновом потоке + let table_name = name.to_string(); + let data_dir = self.data_dir.clone(); + std::thread::spawn(move || { + // Здесь должна быть логика сохранения измененной таблицы + // Для простоты просто логируем + log::info!("Table '{}' altered, changes need to be saved", table_name); + }); + + Ok(()) + } else { + Err(DatabaseError::TableNotFound(name.to_string())) + } + } + + /// Создание индекса + pub fn create_index(&self, table: &str, columns: &[String], name: Option) + -> Result<(), DatabaseError> { + let index_name = name.unwrap_or_else(|| { + format!("idx_{}_{}", table, columns.join("_")) + }); + + self.indexes.entry(table.to_string()).or_insert_with(Vec::new).push(index_name); + + crate::utils::logger::log_info(&format!("Index created on table '{}'", table)); + Ok(()) + } + + /// Удаление индекса + pub fn drop_index(&self, table: &str, name: &str) -> Result<(), DatabaseError> { + if let Some(mut indexes) = self.indexes.get_mut(table) { + if let Some(pos) = indexes.iter().position(|n| n == name) { + indexes.remove(pos); + crate::utils::logger::log_info(&format!("Index '{}' dropped from table '{}'", name, table)); + Ok(()) + } else { + Err(DatabaseError::TransactionError( + format!("Index '{}' not found on table '{}'", name, table) + )) + } + } else { + Err(DatabaseError::TransactionError( + format!("No indexes found for table '{}'", table) + )) + } + } + + /// Создание триггера + pub fn create_trigger( + &self, + name: &str, + table: &str, + timing: crate::parser::sql::TriggerTiming, + event: crate::parser::sql::TriggerEvent, + action: &str, + ) -> Result<(), DatabaseError> { + let trigger = Trigger { + name: name.to_string(), + table: table.to_string(), + timing, + event, + action: action.to_string(), + }; + + self.triggers.insert(name.to_string(), trigger); + + crate::utils::logger::log_info(&format!("Trigger '{}' created", name)); + Ok(()) + } + + /// Удаление триггера + pub fn drop_trigger(&self, name: &str) -> Result<(), DatabaseError> { + if self.triggers.remove(name).is_some() { + crate::utils::logger::log_info(&format!("Trigger '{}' dropped", name)); + Ok(()) + } else { + Err(DatabaseError::TransactionError( + format!("Trigger '{}' not found", name) + )) + } + } + + /// Получение таблицы (неизменяемая ссылка) + /// + /// Wait-free операция: не блокирует другие операции с базой данных. + /// + /// # Аргументы + /// * `name` - имя таблицы + /// + /// # Возвращает + /// * `Option` - копия таблицы или None если не найдена + pub fn get_table(&self, name: &str) -> Option
{ + // Получаем таблицу из DashMap и клонируем ее + let table_ref = self.tables.get(name)?; + Some(table_ref.clone()) + } + + /// Получение таблицы для изменения + /// + /// Эта операция может блокировать другие операции с той же таблицей, + /// но не блокирует операции с другими таблицами в базе данных. + /// + /// # Аргументы + /// * `name` - имя таблицы + /// + /// # Возвращает + /// * `Option R>` - функция для изменения таблицы + pub fn with_table_mut(&self, name: &str, f: F) -> Option + where + F: FnOnce(&mut Table) -> R, + { + if let Some(mut table) = self.tables.get_mut(name) { + Some(f(&mut *table)) + } else { + None + } + } + + /// Параллельное выполнение SQL запросов + pub async fn execute_parallel(&self, queries: Vec) -> Result, DatabaseError> { + use tokio::task::JoinSet; + let mut results = Vec::with_capacity(queries.len()); + let mut tasks = JoinSet::new(); + + for query in queries { + let db_name = self.name.clone(); + tasks.spawn(async move { + // Здесь должен быть парсинг и выполнение запроса + // Для примера просто возвращаем результат + format!("Executed in {}: {}", db_name, query) + }); + } + + while let Some(result) = tasks.join_next().await { + match result { + Ok(res) => results.push(res), + Err(e) => return Err(DatabaseError::TransactionError(format!("Task error: {}", e))), + } + } + + Ok(results) + } + + /// Удаление базы данных + /// + /// Удаляет директорию базы данных со всеми таблицами и данными. + /// Использует wait-free подход для установки флага неактивности. + /// + /// # Возвращает + /// * `Result<(), DatabaseError>` - успех или ошибка удаления + pub async fn drop(self) -> Result<(), DatabaseError> { + // Устанавливаем флаг неактивности wait-free способом + self.is_active.store(false, Ordering::SeqCst); + + if Path::new(&self.data_dir).exists() { + // Рекурсивно удаляем директорию базы данных асинхронно + tokio::fs::remove_dir_all(&self.data_dir).await + .map_err(|e| DatabaseError::IoError(std::io::Error::from(e)))?; + + // Логируем удаление базы данных + crate::utils::logger::log_info(&format!("Database '{}' dropped", self.name)); + } + + Ok(()) + } + + /// Получение списка таблиц в базе данных + /// + /// Wait-free операция: просто возвращает копию ключей DashMap. + /// + /// # Возвращает + /// * `Vec` - имена всех таблиц в базе данных + pub fn list_tables(&self) -> Vec { + self.tables.iter().map(|entry| entry.key().clone()).collect() + } + + /// Сохранение метаданных базы данных + /// + /// Сохраняет информацию о всех таблицах в файл meta.json. + /// Эта операция может блокировать доступ к файлу на короткое время. + /// + /// # Возвращает + /// * `Result<(), DatabaseError>` - успех или ошибка сохранения + fn save_metadata(&self) -> Result<(), DatabaseError> { + let meta_path = format!("{}/meta.json", self.data_dir); + + // Собираем имена всех таблиц + let table_names: Vec = self.list_tables(); + + // Сериализуем в JSON + let meta_content = serde_json::to_string_pretty(&table_names) + .map_err(|e| DatabaseError::SerializeError(e))?; + + // Записываем в файл + fs::write(meta_path, meta_content) + .map_err(|e| DatabaseError::IoError(e)) + } + + /// Получение имени базы данных + /// + /// Wait-free операция: просто возвращает ссылку на строку. + /// + /// # Возвращает + /// * `&str` - имя базы данных + pub fn name(&self) -> &str { + &self.name + } + + /// Получение количества таблиц в базе данных + /// + /// Wait-free операция: просто возвращает размер DashMap. + /// + /// # Возвращает + /// * `usize` - количество таблиц + pub fn table_count(&self) -> usize { + self.tables.len() + } + + /// Получение директории данных базы + /// + /// Wait-free операция: просто возвращает ссылку на строку. + /// + /// # Возвращает + /// * `&str` - путь к директории данных + pub fn data_dir(&self) -> &str { + &self.data_dir + } + + /// Проверка активности базы данных + /// + /// Wait-free операция: использует атомарное чтение булевой переменной. + /// + /// # Возвращает + /// * `bool` - true если база данных активна + pub fn is_active(&self) -> bool { + self.is_active.load(Ordering::SeqCst) + } + + /// Начало транзакции + /// + /// В текущей реализации транзакции эмулируются на уровне REPL. + /// В будущих версиях может быть реализована полноценная поддержка WAL. + /// + /// # Возвращает + /// * `Result<(), DatabaseError>` - успех или ошибка + pub fn begin_transaction(&self) -> Result<(), DatabaseError> { + // В текущей реализации просто логируем начало транзакции + crate::utils::logger::log_info(&format!("Transaction started in database '{}'", self.name)); + Ok(()) + } + + /// Фиксация транзакции + /// + /// Сохраняет все изменения, сделанные в текущей транзакции. + /// + /// # Возвращает + /// * `Result<(), DatabaseError>` - успех или ошибка + pub fn commit_transaction(&self) -> Result<(), DatabaseError> { + // В текущей реализации просто логируем фиксацию транзакции + crate::utils::logger::log_info(&format!("Transaction committed in database '{}'", self.name)); + Ok(()) + } + + /// Откат транзакции + /// + /// Отменяет все изменения, сделанные в текущей транзакции. + /// + /// # Возвращает + /// * `Result<(), DatabaseError>` - успех или ошибка + pub fn rollback_transaction(&self) -> Result<(), DatabaseError> { + // В текущей реализации просто логируем откат транзакции + crate::utils::logger::log_info(&format!("Transaction rolled back in database '{}'", self.name)); + Ok(()) + } +} + +/// Ошибки базы данных +/// +/// Определяет все возможные ошибки, которые могут возникнуть +/// при работе с базами данных flusql. +#[derive(Debug, Error)] +pub enum DatabaseError { + #[error("Database already exists: {0}")] + AlreadyExists(String), + + #[error("Database not found: {0}")] + NotFound(String), + + #[error("Table already exists: {0}")] + TableExists(String), + + #[error("Table not found: {0}")] + TableNotFound(String), + + #[error("IO error: {0}")] + IoError(#[from] std::io::Error), + + #[error("Parse error: {0}")] + ParseError(serde_json::Error), + + #[error("Serialize error: {0}")] + SerializeError(serde_json::Error), + + #[error("Configuration error: {0}")] + ConfigError(String), + + #[error("Table error: {0}")] + TableError(#[from] crate::core::table::TableError), + + #[error("Transaction error: {0}")] + TransactionError(String), + + #[error("Concurrency error: {0}")] + ConcurrencyError(String), +} diff --git a/src/core/index.rs b/src/core/index.rs new file mode 100644 index 0000000..5a6f5c3 --- /dev/null +++ b/src/core/index.rs @@ -0,0 +1,122 @@ +//! Модуль управления индексами + +use std::collections::{HashMap, HashSet}; +use crate::parser::sql::Value; +use std::sync::atomic::{AtomicU64, Ordering}; +use atomic_refcell::AtomicRefCell; + +/// Индекс для быстрого поиска с поддержкой MVCC +#[derive(Debug)] +pub struct Index { + name: String, + data: AtomicRefCell>>, + version: AtomicU64, +} + +impl Clone for Index { + fn clone(&self) -> Self { + let data = self.data.borrow(); + Self { + name: self.name.clone(), + data: AtomicRefCell::new(data.clone()), + version: AtomicU64::new(self.version.load(Ordering::Relaxed)), + } + } +} + +impl Index { + /// Создание нового индекса + pub fn new(name: &str) -> Self { + Self { + name: name.to_string(), + data: AtomicRefCell::new(HashMap::new()), + version: AtomicU64::new(1), + } + } + + /// Вставка значения в индекс (wait-free) + pub fn insert(&self, value: Value, record_id: u64) { + let mut data = self.data.borrow_mut(); + data.entry(value) + .or_insert_with(HashSet::new) + .insert(record_id); + self.version.fetch_add(1, Ordering::SeqCst); + } + + /// Поиск по значению (wait-free чтение) + pub fn search(&self, value: &Value) -> Option> { + let data = self.data.borrow(); + data.get(value).cloned() + } + + /// Удаление значения из индекса + pub fn remove(&self, value: &Value, record_id: u64) { + let mut data = self.data.borrow_mut(); + if let Some(set) = data.get_mut(value) { + set.remove(&record_id); + if set.is_empty() { + data.remove(value); + } + self.version.fetch_add(1, Ordering::SeqCst); + } + } + + /// Получение всех значений индекса (wait-free) + pub fn get_all(&self) -> Vec { + let data = self.data.borrow(); + data.keys().cloned().collect() + } + + /// Очистка индекса + pub fn clear(&mut self) { + let mut data = self.data.borrow_mut(); + data.clear(); + self.version.fetch_add(1, Ordering::SeqCst); + } + + /// Получение имени индекса + pub fn name(&self) -> &str { + &self.name + } + + /// Получение версии индекса + pub fn version(&self) -> u64 { + self.version.load(Ordering::Relaxed) + } + + /// Массовая вставка записей (оптимизированная) + pub fn bulk_insert(&self, values: Vec<(Value, u64)>) { + let mut data = self.data.borrow_mut(); + for (value, record_id) in values { + data.entry(value) + .or_insert_with(HashSet::new) + .insert(record_id); + } + self.version.fetch_add(1, Ordering::SeqCst); + } + + /// Поиск по диапазону (для упорядоченных типов) + pub fn range_search(&self, start: &Value, end: &Value) -> HashSet { + let data = self.data.borrow(); + let mut result = HashSet::new(); + + for (key, record_ids) in data.iter() { + // Упрощенная реализация - только для сравнимых типов + match (key, start, end) { + (Value::Integer(k), Value::Integer(s), Value::Integer(e)) => { + if k >= s && k <= e { + result.extend(record_ids); + } + } + (Value::Text(k), Value::Text(s), Value::Text(e)) => { + if k >= s && k <= e { + result.extend(record_ids); + } + } + _ => {} + } + } + + result + } +} diff --git a/src/core/table.rs b/src/core/table.rs new file mode 100644 index 0000000..b112d84 --- /dev/null +++ b/src/core/table.rs @@ -0,0 +1,1050 @@ +//! Модуль управления таблицами flusql +//! +//! Этот модуль реализует функциональность таблиц базы данных: +//! - Создание и загрузку таблиц +//! - Вставку и выборку данных +//! - Экспорт/импорт данных в формате CSV +//! - Управление индексами +//! - Валидацию данных по схеме таблицы +//! - Поддержку внешних ключей, проверок и триггеров + +use std::collections::HashMap; +use std::fs; +use std::path::Path; +use std::sync::atomic::{AtomicU64, Ordering}; +use serde::{Deserialize, Serialize}; +use crate::parser::sql::{Value, WhereClause, Operator, Expression}; +use crate::core::index::Index; +use thiserror::Error; +use atomic_refcell::AtomicRefCell; +use nanorand::Rng; +use nanorand::WyRand; +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Схема таблицы +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TableSchema { + /// Список столбцов таблицы + pub columns: Vec, + + /// Имя столбца, являющегося первичным ключом + pub primary_key: Option, + + /// Список индексированных столбцов + pub indexes: Vec, + + /// Внешние ключи + pub foreign_keys: Vec, + + /// Проверочные ограничения + pub checks: Vec, +} + +/// Схема внешнего ключа +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ForeignKeyDef { + pub name: String, + pub local_columns: Vec, + pub referenced_table: String, + pub referenced_columns: Vec, + pub on_delete: String, + pub on_update: String, +} + +/// Схема столбца +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ColumnSchema { + /// Имя столбца + pub name: String, + + /// Тип данных столбца + pub data_type: DataType, + + /// Может ли столбец содержать NULL значения + pub nullable: bool, + + /// Должны ли значения в столбце быть уникальными + pub unique: bool, +} + +/// Тип данных +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum DataType { + /// Целочисленный тип + Integer, + + /// Текстовый тип + Text, + + /// Логический тип + Boolean, + + /// Число с плавающей точкой + Float, +} + +/// Запись в таблице с поддержкой MVCC +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Record { + /// Уникальный идентификатор записи + pub id: u64, + + /// Значения записи по столбцам + pub values: HashMap, + + /// Версия записи для MVCC (timestamp микросекунды) + pub version: u64, + + /// ID транзакции, создавшей запись + pub tx_id: u64, + + /// Флаг удаления (для MVCC) + pub deleted: bool, +} + +/// Таблица базы данных с поддержкой MVCC и wait-free операций +#[derive(Debug)] +pub struct Table { + /// Имя таблицы + name: String, + + /// Схема таблицы + schema: TableSchema, + + /// Директория для хранения данных таблицы + data_dir: String, + + /// Список записей таблицы (AtomicRefCell для wait-free чтения) + records: AtomicRefCell>, + + /// Следующий доступный ID для новой записи (атомарный) + next_id: AtomicU64, + + /// Следующий ID версии для MVCC (атомарный) + next_version: AtomicU64, + + /// Первичный индекс (если есть первичный ключ) + primary_index: AtomicRefCell>, + + /// Вторичные индексы + secondary_indexes: AtomicRefCell>, + + /// Генератор случайных чисел для wait-free операций + rng: WyRand, +} + +impl Clone for Table { + fn clone(&self) -> Self { + let records = self.records.borrow(); + let primary_index = self.primary_index.borrow(); + let secondary_indexes = self.secondary_indexes.borrow(); + + Self { + name: self.name.clone(), + schema: self.schema.clone(), + data_dir: self.data_dir.clone(), + records: AtomicRefCell::new(records.clone()), + next_id: AtomicU64::new(self.next_id.load(Ordering::Relaxed)), + next_version: AtomicU64::new(self.next_version.load(Ordering::Relaxed)), + primary_index: AtomicRefCell::new(primary_index.clone()), + secondary_indexes: AtomicRefCell::new(secondary_indexes.clone()), + rng: self.rng.clone(), + } + } +} + +impl Table { + /// Создание новой таблицы + pub fn new(name: &str, schema: TableSchema, data_dir: &str) -> Self { + let mut rng = WyRand::new(); + let seed = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64; + + // Используем fill_bytes вместо fill, так как fill требует импорта трейта Rng + let mut seed_bytes = [0u8; 8]; + rng.fill_bytes(&mut seed_bytes); + // Комбинируем с системным временем для лучшей случайности + for (i, &byte) in seed.to_be_bytes().iter().enumerate().take(8) { + seed_bytes[i] ^= byte; + } + + Self { + name: name.to_string(), + schema: schema.clone(), + data_dir: data_dir.to_string(), + records: AtomicRefCell::new(Vec::new()), + next_id: AtomicU64::new(1), + next_version: AtomicU64::new(1), + primary_index: AtomicRefCell::new(if schema.primary_key.is_some() { + Some(Index::new(&schema.primary_key.clone().unwrap())) + } else { + None + }), + secondary_indexes: AtomicRefCell::new(HashMap::new()), + rng, + } + } + + /// Загрузка таблицы из файла + pub fn load(data_dir: &str, name: &str) -> Result { + let schema_path = format!("{}/{}/schema.json", data_dir, name); + let data_path = format!("{}/{}/data.json", data_dir, name); + + if !Path::new(&schema_path).exists() { + return Err(TableError::NotFound(name.to_string())); + } + + // Загрузка схемы + let schema_content = fs::read_to_string(&schema_path) + .map_err(|e| TableError::IoError(e))?; + let schema: TableSchema = serde_json::from_str(&schema_content) + .map_err(|e| TableError::ParseError(e))?; + + // Загрузка данных + let table = Self::new(name, schema, data_dir); + + if Path::new(&data_path).exists() { + let data_content = fs::read_to_string(&data_path) + .map_err(|e| TableError::IoError(e))?; + let records: Vec = serde_json::from_str(&data_content) + .map_err(|e| TableError::ParseError(e))?; + + // Wait-free загрузка записей + let max_id = records.iter().map(|r| r.id).max().unwrap_or(0); + let max_version = records.iter().map(|r| r.version).max().unwrap_or(0); + + table.next_id.store(max_id + 1, Ordering::SeqCst); + table.next_version.store(max_version + 1, Ordering::SeqCst); + + { + let mut records_ref = table.records.borrow_mut(); + *records_ref = records; + } + + // Восстановление индексов + table.rebuild_indexes(); + } + + Ok(table) + } + + /// Сохранение таблицы (неблокирующая версия) + pub fn save(&self) -> Result<(), TableError> { + let table_dir = format!("{}/{}", self.data_dir, self.name); + + if !Path::new(&table_dir).exists() { + fs::create_dir_all(&table_dir) + .map_err(|e| TableError::IoError(e))?; + } + + // Сохранение схемы + let schema_path = format!("{}/schema.json", table_dir); + let schema_content = serde_json::to_string_pretty(&self.schema) + .map_err(|e| TableError::SerializeError(e))?; + + // Сохранение данных (неблокирующее чтение) + let data_path = format!("{}/data.json", table_dir); + let records = self.records.borrow(); + let data_content = serde_json::to_string_pretty(&*records) + .map_err(|e| TableError::SerializeError(e))?; + + // Параллельная запись (можно было бы использовать async, но для простоты оставим так) + std::thread::spawn(move || { + let _ = fs::write(schema_path, schema_content); + let _ = fs::write(data_path, data_content); + }); + + Ok(()) + } + + /// Вставка записи с поддержкой MVCC (wait-free) + pub fn insert(&self, values: HashMap, tx_id: u64) -> Result { + // Валидация данных + self.validate_record(&values)?; + + let id = self.next_id.fetch_add(1, Ordering::SeqCst); + let version = self.next_version.fetch_add(1, Ordering::SeqCst); + + let mut record = Record { + id, + values, + version, + tx_id, + deleted: false, + }; + + // Применение значений по умолчанию и добавление timestamp + for column in &self.schema.columns { + if !record.values.contains_key(&column.name) { + if column.name == "timestamp" { + // Автоматическое добавление текущего времени + let timestamp = chrono::Local::now().to_rfc3339(); + record.values.insert(column.name.clone(), Value::Text(timestamp)); + } else if column.nullable { + record.values.insert(column.name.clone(), Value::Null); + } + } + } + + // Проверка уникальности (wait-free через snapshot) + { + let records = self.records.borrow(); + for column in &self.schema.columns { + if column.unique && column.name != "timestamp" { + if let Some(value) = record.values.get(&column.name) { + for existing_record in records.iter().filter(|r| !r.deleted) { + if let Some(existing_value) = existing_record.values.get(&column.name) { + if values_equal(value, existing_value) { + return Err(TableError::DuplicateValue( + column.name.clone(), + format!("{:?}", value) + )); + } + } + } + } + } + } + } + + // Проверка внешних ключей (упрощенная) + self.validate_foreign_keys(&record)?; + + // Проверка ограничений CHECK + self.validate_checks(&record)?; + + // Wait-free вставка через copy-on-write + { + let mut records = self.records.borrow_mut(); + records.push(record.clone()); + } + + // Обновление индексов (неблокирующее) + self.update_indexes(&record); + + // Используем функцию логирования вместо макроса + crate::utils::logger::log_debug(&format!("Inserted record with id {} into table '{}'", id, self.name)); + Ok(id) + } + + /// Выборка записей с поддержкой MVCC (wait-free чтение) + pub fn select(&self, columns: &[String], where_clause: Option<&WhereClause>, limit: Option, + order_by: Option>, group_by: Option>, + joins: Option>, tx_id: u64) + -> Result>, TableError> { + + // Wait-free snapshot записей + let records = self.records.borrow(); + + // Фильтрация по MVCC: только видимые текущей транзакции записи + let visible_records: Vec<&Record> = records.iter() + .filter(|r| self.is_visible(r, tx_id)) + .collect(); + + let mut results = Vec::new(); + + // Определяем порядок столбцов для результата + let result_columns = if columns.len() == 1 && columns[0] == "*" { + // Если SELECT *, используем порядок столбцов из схемы + self.schema.columns.iter().map(|c| c.name.clone()).collect::>() + } else { + // Иначе используем порядок из запроса + columns.to_vec() + }; + + // Используем индекс для поиска, если возможно + if let Some(clause) = where_clause { + let column_name = self.extract_column_from_where(clause); + if let Some(index) = self.get_index_for_column(&column_name) { + let clause_value = self.extract_value_from_where(clause); + if let Some(clause_value) = clause_value { + if let Some(record_ids) = index.search(&clause_value) { + for id in record_ids { + if let Some(record) = visible_records.iter().find(|r| r.id == id) { + if self.matches_where(record, clause) { + results.push(self.project_record(record, &result_columns)); + } + } + } + } + } + } else { + // Фильтрация по условию WHERE + for record in visible_records { + if self.matches_where(record, clause) { + results.push(self.project_record(record, &result_columns)); + } + } + } + } else { + // Без условия WHERE + for record in visible_records { + results.push(self.project_record(record, &result_columns)); + } + } + + // Применяем сортировку + if let Some(order_by) = order_by { + self.sort_results(&mut results, &order_by); + } + + // Применяем группировку + if let Some(group_by) = group_by { + results = self.group_results(results, &group_by); + } + + // Применяем объединения + if let Some(joins) = joins { + // Упрощенная реализация JOIN + for join in joins { + let _ = join; + } + } + + // Применяем LIMIT + if let Some(limit) = limit { + results.truncate(limit); + } + + Ok(results) + } + + /// Обновление записей с поддержкой MVCC + pub fn update(&self, updates: HashMap, where_clause: Option, tx_id: u64) + -> Result { + let mut count = 0; + + // Создаем snapshot записей + let mut records = self.records.borrow_mut(); + + for record in records.iter_mut().filter(|r| self.is_visible(r, tx_id) && !r.deleted) { + let mut matches = true; + + if let Some(clause) = &where_clause { + if !self.matches_where(record, clause) { + matches = false; + } + } + + if matches { + // Создаем новую версию записи для MVCC + let old_record = record.clone(); + + // Выполнение триггеров BEFORE UPDATE + self.execute_triggers("BEFORE_UPDATE", &old_record); + + // Применяем обновления к записи + for (column, value) in &updates { + record.values.insert(column.clone(), value.clone()); + } + + // Обновляем версию и tx_id + record.version = self.next_version.fetch_add(1, Ordering::SeqCst); + record.tx_id = tx_id; + + // Обновляем индексы + self.update_indexes_for_record(record.id, &updates, &old_record.values); + + // Выполнение триггеров AFTER UPDATE + self.execute_triggers("AFTER_UPDATE", record); + + count += 1; + } + } + + if count > 0 { + crate::utils::logger::log_debug(&format!("Updated {} records in table '{}'", count, self.name)); + } + + Ok(count) + } + + /// Удаление записей с поддержкой MVCC (логическое удаление) + pub fn delete(&self, where_clause: Option, tx_id: u64) -> Result { + let mut count = 0; + let mut records = self.records.borrow_mut(); + + for record in records.iter_mut().filter(|r| self.is_visible(r, tx_id) && !r.deleted) { + let mut matches = true; + + if let Some(clause) = &where_clause { + if !self.matches_where(record, clause) { + matches = false; + } + } + + if matches { + // Выполнение триггеров BEFORE DELETE + self.execute_triggers("BEFORE_DELETE", record); + + // Проверка внешних ключей + self.check_foreign_key_constraints(record)?; + + // Логическое удаление для MVCC + record.deleted = true; + record.version = self.next_version.fetch_add(1, Ordering::SeqCst); + record.tx_id = tx_id; + + // Удаляем из индексов + self.remove_from_indexes(record); + + // Выполнение триггеров AFTER DELETE + self.execute_triggers("AFTER_DELETE", record); + + count += 1; + } + } + + if count > 0 { + crate::utils::logger::log_debug(&format!("Deleted {} records from table '{}'", count, self.name)); + } + + Ok(count) + } + + /// Проверка видимости записи для транзакции (MVCC) + fn is_visible(&self, record: &Record, tx_id: u64) -> bool { + // Упрощенная реализация MVCC: + // 1. Запись должна быть не удалена или удалена текущей транзакцией + // 2. Запись должна быть создана до начала транзакции + // 3. Если запись была изменена, изменение должно быть выполнено текущей транзакцией + // или транзакция должна быть завершена + + if record.deleted && record.tx_id != tx_id { + return false; + } + + // В реальной MVCC здесь была бы проверка snapshot isolation + // Для простоты считаем, что все записи видны + true + } + + /// Экспорт в CSV (wait-free) + pub fn export_csv(&self, file_path: &str) -> Result<(), TableError> { + use csv::Writer; + + let mut wtr = Writer::from_path(file_path) + .map_err(|e| TableError::IoError(std::io::Error::new( + std::io::ErrorKind::Other, e.to_string() + )))?; + + // Записываем заголовки в порядке из схемы + let headers: Vec<&str> = self.schema.columns.iter() + .map(|c| c.name.as_str()) + .collect(); + wtr.write_record(&headers) + .map_err(|e| TableError::ExportError(e.to_string()))?; + + // Wait-free чтение записей + let records = self.records.borrow(); + + // Записываем данные (только неудаленные записи) + for record in records.iter().filter(|r| !r.deleted) { + let mut row = Vec::new(); + for column in &self.schema.columns { + let value = record.values.get(&column.name) + .map(|v| v.to_string()) + .unwrap_or_else(|| "NULL".to_string()); + row.push(value); + } + wtr.write_record(&row) + .map_err(|e| TableError::ExportError(e.to_string()))?; + } + + wtr.flush() + .map_err(|e| TableError::IoError(std::io::Error::new( + std::io::ErrorKind::Other, e.to_string() + )))?; + + crate::utils::logger::log_info(&format!("Exported table '{}' to '{}'", self.name, file_path)); + Ok(()) + } + + /// Импорт из CSV + pub fn import_csv(&mut self, file_path: &str) -> Result { + use csv::Reader; + + let mut rdr = Reader::from_path(file_path) + .map_err(|e| TableError::IoError(std::io::Error::new( + std::io::ErrorKind::Other, e.to_string() + )))?; + + let headers = rdr.headers() + .map_err(|e| TableError::ImportError(e.to_string()))? + .iter() + .map(|s| s.to_string()) + .collect::>(); + + let mut count = 0; + let tx_id = self.generate_tx_id(); + + for result in rdr.records() { + let record = result + .map_err(|e| TableError::ImportError(e.to_string()))?; + + let mut values = HashMap::new(); + for (i, field) in record.iter().enumerate() { + if i < headers.len() { + let value = if field == "NULL" { + Value::Null + } else if let Ok(int_val) = field.parse::() { + Value::Integer(int_val) + } else if let Ok(float_val) = field.parse::() { + Value::Float(float_val) + } else if field == "TRUE" { + Value::Boolean(true) + } else if field == "FALSE" { + Value::Boolean(false) + } else { + Value::Text(field.to_string()) + }; + + values.insert(headers[i].clone(), value); + } + } + + // Автоматически добавляем timestamp если его нет + if !values.contains_key("timestamp") { + let timestamp = chrono::Local::now().to_rfc3339(); + values.insert("timestamp".to_string(), Value::Text(timestamp)); + } + + self.insert(values, tx_id)?; + count += 1; + } + + crate::utils::logger::log_info(&format!("Imported {} records into table '{}' from '{}'", count, self.name, file_path)); + Ok(count) + } + + /// Генерация ID транзакции + fn generate_tx_id(&mut self) -> u64 { + let time_part = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_micros() as u64; + + // Используем fill_bytes для генерации случайной части + let mut random_bytes = [0u8; 8]; + self.rng.fill_bytes(&mut random_bytes); + let random_part = u64::from_le_bytes(random_bytes); + + (time_part << 16) | (random_part & 0xFFFF) + } + + /// Получение схемы таблицы + pub fn get_schema(&self) -> TableSchema { + self.schema.clone() + } + + /// Извлечение имени столбца из условия WHERE + fn extract_column_from_where(&self, clause: &WhereClause) -> String { + match &clause.left { + Expression::Column(name) => name.clone(), + _ => "".to_string(), + } + } + + /// Извлечение значения из условия WHERE + fn extract_value_from_where(&self, clause: &WhereClause) -> Option { + match &clause.right { + Some(Expression::Value(value)) => Some(value.clone()), + _ => None, + } + } + + /// Валидация записи + fn validate_record(&self, values: &HashMap) -> Result<(), TableError> { + for column in &self.schema.columns { + if let Some(value) = values.get(&column.name) { + // Пропускаем валидацию timestamp если он автоматический + if column.name == "timestamp" { + continue; + } + + // Проверка типа данных + match (&column.data_type, value) { + (DataType::Integer, Value::Integer(_)) => {}, + (DataType::Text, Value::Text(_)) => {}, + (DataType::Boolean, Value::Boolean(_)) => {}, + (DataType::Float, Value::Float(_)) => {}, + (DataType::Float, Value::Integer(i)) => { + // Разрешаем целые числа для float + let _ = i; + }, + _ => { + return Err(TableError::TypeMismatch( + column.name.clone(), + format!("{:?}", column.data_type), + format!("{:?}", value) + )); + } + } + } else if !column.nullable && column.name != "timestamp" { + return Err(TableError::NotNullViolation(column.name.clone())); + } + } + + Ok(()) + } + + /// Проверка внешних ключей + fn validate_foreign_keys(&self, record: &Record) -> Result<(), TableError> { + // Упрощенная реализация + Ok(()) + } + + /// Проверка ограничений CHECK + fn validate_checks(&self, record: &Record) -> Result<(), TableError> { + // Упрощенная реализация + Ok(()) + } + + /// Проверка ограничений внешних ключей при удалении + fn check_foreign_key_constraints(&self, record: &Record) -> Result<(), TableError> { + // Упрощенная реализация + Ok(()) + } + + /// Выполнение триггеров + fn execute_triggers(&self, event: &str, record: &Record) { + crate::utils::logger::log_debug(&format!("Trigger {} executed for record {} in table '{}'", + event, record.id, self.name)); + } + + /// Сортировка результатов + fn sort_results(&self, results: &mut Vec>, order_by: &[(String, bool)]) { + results.sort_by(|a, b| { + for (column, ascending) in order_by { + let a_val = a.get(column); + let b_val = b.get(column); + + match (a_val, b_val) { + (Some(a), Some(b)) => { + let cmp = self.compare_values(a, b); + if cmp != std::cmp::Ordering::Equal { + return if *ascending { cmp } else { cmp.reverse() }; + } + } + _ => {} + } + } + std::cmp::Ordering::Equal + }); + } + + /// Группировка результатов + fn group_results(&self, results: Vec>, group_by: &[String]) + -> Vec> { + let mut grouped = Vec::new(); + let mut groups = HashMap::new(); + + for row in results { + let key: Vec = group_by.iter() + .filter_map(|col| row.get(col).map(|v| v.to_string())) + .collect(); + let key_str = key.join("|"); + + groups.entry(key_str).or_insert_with(Vec::new).push(row); + } + + for (_, rows) in groups { + if let Some(first_row) = rows.into_iter().next() { + grouped.push(first_row); + } + } + + grouped + } + + /// Сравнение значений для сортировки + fn compare_values(&self, a: &Value, b: &Value) -> std::cmp::Ordering { + match (a, b) { + (Value::Integer(a), Value::Integer(b)) => a.cmp(b), + (Value::Float(a), Value::Float(b)) => { + // Безопасное сравнение float с обработкой NaN + a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal) + }, + (Value::Text(a), Value::Text(b)) => a.cmp(b), + (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b), + (Value::Null, Value::Null) => std::cmp::Ordering::Equal, + (Value::Null, _) => std::cmp::Ordering::Less, + (_, Value::Null) => std::cmp::Ordering::Greater, + _ => std::cmp::Ordering::Equal, + } + } + + /// Проверка условия WHERE + fn matches_where(&self, record: &Record, clause: &WhereClause) -> bool { + let column_name = match &clause.left { + Expression::Column(name) => name, + _ => return false, + }; + + let clause_value = match &clause.right { + Some(Expression::Value(value)) => value, + None => { + // Обработка IS NULL и IS NOT NULL + return match &clause.operator { + Operator::IsNull => { + match record.values.get(column_name) { + Some(value) => matches!(value, Value::Null), + None => true, + } + } + Operator::IsNotNull => { + match record.values.get(column_name) { + Some(value) => !matches!(value, Value::Null), + None => false, + } + } + _ => false, + }; + } + _ => return false, + }; + + match record.values.get(column_name) { + Some(value) => match &clause.operator { + Operator::Eq => values_equal(value, clause_value), + Operator::Ne => !values_equal(value, clause_value), + Operator::Gt => value_gt(value, clause_value), + Operator::Lt => value_lt(value, clause_value), + Operator::Ge => values_equal(value, clause_value) || value_gt(value, clause_value), + Operator::Le => values_equal(value, clause_value) || value_lt(value, clause_value), + Operator::Like => { + if let (Value::Text(s), Value::Text(pattern)) = (value, clause_value) { + pattern == "%" || s.contains(pattern.trim_matches('%')) + } else { + false + } + } + _ => false, + }, + None => false, + } + } + + /// Проекция записи с сохранением порядка столбцов + fn project_record(&self, record: &Record, columns: &[String]) -> HashMap { + let mut result = HashMap::new(); + + for column_name in columns { + if let Some(value) = record.values.get(column_name) { + result.insert(column_name.clone(), value.clone()); + } else { + result.insert(column_name.clone(), Value::Null); + } + } + + result + } + + /// Получение индекса для столбца + fn get_index_for_column(&self, column: &str) -> Option { + let primary_index = self.primary_index.borrow(); + let secondary_indexes = self.secondary_indexes.borrow(); + + if Some(column) == self.schema.primary_key.as_deref() { + (*primary_index).clone() + } else { + secondary_indexes.get(column).cloned() + } + } + + /// Перестроение индексов + fn rebuild_indexes(&self) { + let records = self.records.borrow(); + + // Перестраиваем первичный индекс + if let Some(pk) = &self.schema.primary_key { + let index = Index::new(pk); + for record in records.iter().filter(|r| !r.deleted) { + if let Some(value) = record.values.get(pk) { + index.insert(value.clone(), record.id); + } + } + *self.primary_index.borrow_mut() = Some(index); + } + + // Перестраиваем вторичные индексы + let mut new_indexes = HashMap::new(); + for index_name in &self.schema.indexes { + let index = Index::new(index_name); + for record in records.iter().filter(|r| !r.deleted) { + if let Some(value) = record.values.get(index_name) { + index.insert(value.clone(), record.id); + } + } + new_indexes.insert(index_name.clone(), index); + } + *self.secondary_indexes.borrow_mut() = new_indexes; + } + + /// Обновление индексов для новой записи + fn update_indexes(&self, record: &Record) { + // Обновляем первичный индекс + if let (Some(pk), Some(index)) = (&self.schema.primary_key, &mut *self.primary_index.borrow_mut()) { + if let Some(value) = record.values.get(pk) { + index.insert(value.clone(), record.id); + } + } + + // Обновляем вторичные индексы + let mut indexes = self.secondary_indexes.borrow_mut(); + for (index_name, index) in indexes.iter_mut() { + if let Some(value) = record.values.get(index_name) { + index.insert(value.clone(), record.id); + } + } + } + + /// Обновление индексов для измененной записи + fn update_indexes_for_record(&self, record_id: u64, updates: &HashMap, old_values: &HashMap) { + // Обновляем первичный индекс + if let (Some(pk), Some(index)) = (&self.schema.primary_key, &mut *self.primary_index.borrow_mut()) { + if let Some(new_value) = updates.get(pk) { + if let Some(old_value) = old_values.get(pk) { + index.remove(old_value, record_id); + } + index.insert(new_value.clone(), record_id); + } + } + + // Обновляем вторичные индексы + let mut indexes = self.secondary_indexes.borrow_mut(); + for (index_name, index) in indexes.iter_mut() { + if let Some(new_value) = updates.get(index_name) { + if let Some(old_value) = old_values.get(index_name) { + index.remove(old_value, record_id); + } + index.insert(new_value.clone(), record_id); + } + } + } + + /// Удаление из индексов + fn remove_from_indexes(&self, record: &Record) { + // Удаляем из первичного индекса + if let (Some(pk), Some(index)) = (&self.schema.primary_key, &mut *self.primary_index.borrow_mut()) { + if let Some(value) = record.values.get(pk) { + index.remove(value, record.id); + } + } + + // Удаляем из вторичных индексов + let mut indexes = self.secondary_indexes.borrow_mut(); + for (index_name, index) in indexes.iter_mut() { + if let Some(value) = record.values.get(index_name) { + index.remove(value, record.id); + } + } + } + + /// Получение имени таблицы + pub fn name(&self) -> &str { + &self.name + } + + /// Получение количества записей в таблице + pub fn record_count(&self) -> usize { + let records = self.records.borrow(); + records.iter().filter(|r| !r.deleted).count() + } + + /// Получение общего количества записей (включая удаленные) + pub fn total_record_count(&self) -> usize { + let records = self.records.borrow(); + records.len() + } +} + +/// Сравнение значений на равенство с безопасной обработкой float +fn values_equal(v1: &Value, v2: &Value) -> bool { + match (v1, v2) { + (Value::Integer(a), Value::Integer(b)) => a == b, + (Value::Text(a), Value::Text(b)) => a == b, + (Value::Boolean(a), Value::Boolean(b)) => a == b, + (Value::Float(a), Value::Float(b)) => { + // Безопасное сравнение float + if a.is_nan() && b.is_nan() { + true // Оба NaN считаем равными + } else { + (a - b).abs() < f64::EPSILON * 100.0 // С допуском + } + } + (Value::Null, Value::Null) => true, + _ => false, + } +} + +/// Сравнение значений (больше) с безопасной обработкой float +fn value_gt(v1: &Value, v2: &Value) -> bool { + match (v1, v2) { + (Value::Integer(a), Value::Integer(b)) => a > b, + (Value::Float(a), Value::Float(b)) => { + // Безопасное сравнение с NaN + !a.is_nan() && !b.is_nan() && a > b + } + (Value::Text(a), Value::Text(b)) => a > b, + _ => false, + } +} + +/// Сравнение значений (меньше) с безопасной обработкой float +fn value_lt(v1: &Value, v2: &Value) -> bool { + match (v1, v2) { + (Value::Integer(a), Value::Integer(b)) => a < b, + (Value::Float(a), Value::Float(b)) => { + // Безопасное сравнение с NaN + !a.is_nan() && !b.is_nan() && a < b + } + (Value::Text(a), Value::Text(b)) => a < b, + _ => false, + } +} + +/// Ошибки таблицы +#[derive(Debug, Error)] +pub enum TableError { + #[error("Table not found: {0}")] + NotFound(String), + + #[error("IO error: {0}")] + IoError(#[from] std::io::Error), + + #[error("Parse error: {0}")] + ParseError(serde_json::Error), + + #[error("Serialize error: {0}")] + SerializeError(serde_json::Error), + + #[error("Type mismatch in column '{0}': expected {1}, got {2}")] + TypeMismatch(String, String, String), + + #[error("NOT NULL violation in column '{0}'")] + NotNullViolation(String), + + #[error("Duplicate value in unique column '{0}': {1}")] + DuplicateValue(String, String), + + #[error("Foreign key violation: {0}")] + ForeignKeyViolation(String), + + #[error("Check constraint violation: {0}")] + CheckViolation(String), + + #[error("Export error: {0}")] + ExportError(String), + + #[error("Import error: {0}")] + ImportError(String), + + #[error("MVCC conflict: {0}")] + MvccConflict(String), + + #[error("Transaction error: {0}")] + TransactionError(String), +}