Delete src/core/database.rs

This commit is contained in:
Григорий Сафронов 2026-01-15 19:38:25 +00:00
parent e2dfcc0445
commit dce895165b

View File

@ -1,665 +0,0 @@
//! Модуль управления базами данных flusql
//!
//! Этот модуль реализует функциональность создания, открытия,
//! управления и удаления баз данных. Каждая база данных содержит
//! коллекцию таблиц и управляет их жизненным циклом.
//!
//! Основные возможности:
//! - Создание новых баз данных
//! - Открытие существующих баз данных
//! - Создание и управление таблицами
//! - Удаление баз данных
//! - Сохранение и загрузка метаданных
//! - Управление транзакциями (базовое)
//! - Управление индексами
//! - Управление триггерами
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use crate::core::table::Table;
use crate::core::column_family::ColumnFamilyStorage;
use crate::utils::config::Config;
use thiserror::Error;
use serde_json;
use dashmap::DashMap;
use arc_swap::ArcSwap;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::task::JoinSet;
/// База данных flusql
///
/// Представляет собой контейнер для таблиц с метаданными.
/// Каждая база данных хранится в отдельной директории на диске.
///
/// АРХИТЕКТУРНЫЕ ХАРАКТЕРИСТИКИ:
/// - Файловая архитектура: каждая база данных в отдельной директории
/// - Wait-free подход: используется атомарная булева переменная для управления состоянием
/// - Отсутствие блокировок: не используются RwLock, Mutex или другие блокирующие примитивы
/// - Асинхронные операции: некоторые операции выполняются в фоновых потоках
/// - Колоночное хранение: используется семейство столбцов для wait-free доступа
pub struct Database {
/// Имя базы данных
name: String,
/// Таблицы в базе данных (имя -> таблица)
tables: DashMap<String, Table>,
/// Колоночное хранилище
column_families: ColumnFamilyStorage,
/// Конфигурация базы данных
config: Config,
/// Путь к директории с данными базы
data_dir: String,
/// Флаг активности базы данных (wait-free подход)
/// Используется атомарная операция для проверки состояния без блокировок
is_active: AtomicBool,
/// Индексы базы данных
indexes: DashMap<String, Vec<String>>,
/// Триггеры базы данных
triggers: DashMap<String, Trigger>,
/// Пулы для параллельной обработки
query_pools: ArcSwap<JoinSet<()>>,
}
/// Триггер базы данных
#[derive(Debug, Clone)]
struct Trigger {
name: String,
table: String,
timing: crate::parser::sql::TriggerTiming,
event: crate::parser::sql::TriggerEvent,
action: String,
}
impl Database {
/// Создание новой базы данных
///
/// Создает директорию для базы данных и инициализирует структуру.
/// Использует wait-free подход: не блокирует другие операции.
///
/// # Аргументы
/// * `name` - имя создаваемой базы данных
/// * `config` - конфигурация СУБД
///
/// # Возвращает
/// * `Result<Self, DatabaseError>` - новая база данных или ошибка
///
/// # Пример
/// ```
/// use flusql::Database;
/// use flusql::Config;
///
/// let config = Config::default();
/// let db = Database::create("mydb", &config).unwrap();
/// ```
pub fn create(name: &str, config: &Config) -> Result<Self, DatabaseError> {
let data_dir = config.get_data_path(name);
// Wait-free проверка существования базы данных
// Используем атомарные операции файловой системы
if Path::new(&data_dir).exists() {
return Err(DatabaseError::AlreadyExists(name.to_string()));
}
// Создаем директорию для базы данных
// Эта операция блокирующая, но выполняется только при создании
fs::create_dir_all(&data_dir)
.map_err(|e| DatabaseError::IoError(e))?;
// Создаем файл базы данных (basedb.db вместо mydb.db)
let db_file_path = format!("{}/basedb.db", data_dir);
// Создаем информационное содержимое для файла базы данных
let db_info = format!(
"flusql database: {}\ncreated: {}\nversion: 0.5.0\nstorage_format: csv\n",
name,
chrono::Local::now().to_rfc3339()
);
fs::write(&db_file_path, db_info).map_err(|e| DatabaseError::IoError(e))?;
// Логируем создание базы данных
crate::utils::logger::log_info(&format!("Database '{}' created at '{}'", name, db_file_path));
Ok(Self {
name: name.to_string(),
tables: DashMap::new(),
column_families: ColumnFamilyStorage::new(),
config: config.clone(),
data_dir,
is_active: AtomicBool::new(true),
indexes: DashMap::new(),
triggers: DashMap::new(),
query_pools: ArcSwap::new(std::sync::Arc::new(JoinSet::new())),
})
}
/// Открытие существующей базы данных
///
/// Загружает базу данных из директории, включая все таблицы и их данные.
/// Использует wait-free подход для проверки доступности.
///
/// # Аргументы
/// * `name` - имя открываемой базы данных
/// * `config` - конфигурация СУБД
///
/// # Возвращает
/// * `Result<Self, DatabaseError>` - открытая база данных или ошибка
///
/// # Пример
/// ```
/// let db = Database::open("mydb", &config).unwrap();
/// ```
pub fn open(name: &str, config: &Config) -> Result<Self, DatabaseError> {
let data_dir = config.get_data_path(name);
// Wait-free проверка существования базы данных
if !Path::new(&data_dir).exists() {
return Err(DatabaseError::NotFound(name.to_string()));
}
// Проверяем наличие файла базы данных (basedb.db вместо name.db)
let db_file_path = format!("{}/basedb.db", data_dir);
if !Path::new(&db_file_path).exists() {
// Для обратной совместимости проверяем старый формат
let old_db_file_path = format!("{}/{}.db", data_dir, name);
if !Path::new(&old_db_file_path).exists() {
return Err(DatabaseError::NotFound(name.to_string()));
}
}
// Загрузка метаданных базы данных
let meta_path = format!("{}/meta.json", data_dir);
let tables = if Path::new(&meta_path).exists() {
// Читаем файл метаданных
let meta_content = fs::read_to_string(&meta_path)
.map_err(|e| DatabaseError::IoError(e))?;
// Десериализуем список имен таблиц
let table_names: Vec<String> = serde_json::from_str(&meta_content)
.map_err(|e| DatabaseError::ParseError(e))?;
// Загружаем каждую таблицу
let tables_map = DashMap::new();
for table_name in table_names {
if let Ok(table) = Table::load(&data_dir, &table_name) {
tables_map.insert(table_name.clone(), table);
}
}
tables_map
} else {
// Если файл метаданных не существует, создаем пустую базу
DashMap::new()
};
// Логируем открытие базы данных
crate::utils::logger::log_info(&format!("Database '{}' opened from '{}'", name, db_file_path));
Ok(Self {
name: name.to_string(),
tables,
column_families: ColumnFamilyStorage::new(),
config: config.clone(),
data_dir,
is_active: AtomicBool::new(true),
indexes: DashMap::new(),
triggers: DashMap::new(),
query_pools: ArcSwap::new(std::sync::Arc::new(JoinSet::new())),
})
}
/// Создание таблицы в базе данных
///
/// # Аргументы
/// * `name` - имя создаваемой таблицы
/// * `schema` - схема таблицы
///
/// # Возвращает
/// * `Result<(), DatabaseError>` - успех или ошибка создания
///
/// # Пример
/// ```
/// use flusql::core::table::{TableSchema, ColumnSchema, DataType};
///
/// let schema = TableSchema {
/// columns: vec![
/// ColumnSchema {
/// name: "id".to_string(),
/// data_type: DataType::Integer,
/// nullable: false,
/// unique: true,
/// },
/// ],
/// primary_key: Some("id".to_string()),
/// indexes: vec![],
/// foreign_keys: vec![],
/// checks: vec![],
/// };
///
/// db.create_table("users", schema).unwrap();
/// ```
pub fn create_table(&self, name: &str, schema: crate::core::table::TableSchema)
-> Result<(), DatabaseError> {
// Wait-free проверка существования таблицы
if self.tables.contains_key(name) {
return Err(DatabaseError::TableExists(name.to_string()));
}
// Создаем новую таблицу
let table = Table::new(name, schema, &self.data_dir);
// Также создаем колоночное семейство
let _ = self.column_families.create_family(name, table.get_schema());
// Сохраняем таблицу на диск
table.save()
.map_err(|e| DatabaseError::TableError(e))?;
// Добавляем таблицу в коллекцию (lock-free вставка)
self.tables.insert(name.to_string(), table);
// Сохраняем обновленные метаданные
self.save_metadata()?;
// Логируем создание таблицы
crate::utils::logger::log_info(&format!("Table '{}' created in database '{}'", name, self.name));
Ok(())
}
/// Изменение таблицы
pub fn alter_table(&self, name: &str, operation: crate::parser::sql::AlterOperation)
-> Result<(), DatabaseError> {
if let Some(table) = self.tables.get_mut(name) {
// Упрощенная реализация - в реальной системе здесь нужно
// перестраивать таблицу и обновлять данные
match operation {
crate::parser::sql::AlterOperation::AddColumn(column_def) => {
// Добавление столбца
let column_schema = crate::core::table::ColumnSchema {
name: column_def.name,
data_type: match column_def.data_type {
crate::parser::sql::DataType::Integer => crate::core::table::DataType::Integer,
crate::parser::sql::DataType::Text(_) => crate::core::table::DataType::Text,
crate::parser::sql::DataType::Boolean => crate::core::table::DataType::Boolean,
crate::parser::sql::DataType::Float => crate::core::table::DataType::Float,
crate::parser::sql::DataType::Numeric(_) => crate::core::table::DataType::Integer, // Упрощение
crate::parser::sql::DataType::Timestamp => crate::core::table::DataType::Text, // Упрощение
crate::parser::sql::DataType::Array(_) => crate::core::table::DataType::Text, // Упрощение
crate::parser::sql::DataType::Json => crate::core::table::DataType::Text, // Упрощение
crate::parser::sql::DataType::Jsonb => crate::core::table::DataType::Text, // Упрощение
crate::parser::sql::DataType::Uuid => crate::core::table::DataType::Text, // Упрощение
crate::parser::sql::DataType::Bytea => crate::core::table::DataType::Text, // Упрощение
},
nullable: column_def.nullable,
unique: column_def.unique,
};
// В реальной реализации здесь нужно добавлять столбец к схеме таблицы
// и обновлять существующие записи
}
crate::parser::sql::AlterOperation::DropColumn(column_name) => {
// Удаление столбца
// В реальной реализации здесь нужно удалять столбец из схемы
// и удалять данные этого столбца
}
crate::parser::sql::AlterOperation::AlterColumnType { name: column_name, data_type, using } => {
// Изменение типа столбца
// В реальной реализации здесь нужно изменять схему столбца
// и конвертировать существующие данные
let _ = (column_name, data_type, using); // Используем переменные, чтобы избежать предупреждений
}
crate::parser::sql::AlterOperation::SetNotNull(column_name) => {
// Установка NOT NULL
let _ = column_name;
}
crate::parser::sql::AlterOperation::DropNotNull(column_name) => {
// Удаление NOT NULL
let _ = column_name;
}
crate::parser::sql::AlterOperation::SetDefault { name: column_name, default } => {
// Установка значения по умолчанию
let _ = (column_name, default);
}
crate::parser::sql::AlterOperation::DropDefault(column_name) => {
// Удаление значения по умолчанию
let _ = column_name;
}
crate::parser::sql::AlterOperation::RenameColumn { old_name, new_name } => {
// Переименование столбца
let _ = (old_name, new_name);
}
crate::parser::sql::AlterOperation::RenameTable { new_name } => {
// Переименование таблицы
let _ = new_name;
}
_ => {
// Другие операции пока не поддерживаются
return Err(DatabaseError::TransactionError(
"Operation not supported".to_string()
));
}
}
// Сохраняем изменения
let table_clone = table.clone();
std::thread::spawn(move || {
if let Err(e) = table_clone.save() {
log::error!("Failed to save table: {}", e);
}
});
Ok(())
} else {
Err(DatabaseError::TableNotFound(name.to_string()))
}
}
/// Создание индекса
pub fn create_index(&self, table: &str, columns: &[String], name: Option<String>)
-> Result<(), DatabaseError> {
let index_name = name.unwrap_or_else(|| {
format!("idx_{}_{}", table, columns.join("_"))
});
self.indexes.entry(table.to_string()).or_insert_with(Vec::new).push(index_name);
crate::utils::logger::log_info(&format!("Index created on table '{}'", table));
Ok(())
}
/// Удаление индекса
pub fn drop_index(&self, table: &str, name: &str) -> Result<(), DatabaseError> {
if let Some(mut indexes) = self.indexes.get_mut(table) {
if let Some(pos) = indexes.iter().position(|n| n == name) {
indexes.remove(pos);
crate::utils::logger::log_info(&format!("Index '{}' dropped from table '{}'", name, table));
Ok(())
} else {
Err(DatabaseError::TransactionError(
format!("Index '{}' not found on table '{}'", name, table)
))
}
} else {
Err(DatabaseError::TransactionError(
format!("No indexes found for table '{}'", table)
))
}
}
/// Создание триггера
pub fn create_trigger(
&self,
name: &str,
table: &str,
timing: crate::parser::sql::TriggerTiming,
event: crate::parser::sql::TriggerEvent,
action: &str,
) -> Result<(), DatabaseError> {
let trigger = Trigger {
name: name.to_string(),
table: table.to_string(),
timing,
event,
action: action.to_string(),
};
self.triggers.insert(name.to_string(), trigger);
crate::utils::logger::log_info(&format!("Trigger '{}' created", name));
Ok(())
}
/// Удаление триггера
pub fn drop_trigger(&self, name: &str) -> Result<(), DatabaseError> {
if self.triggers.remove(name).is_some() {
crate::utils::logger::log_info(&format!("Trigger '{}' dropped", name));
Ok(())
} else {
Err(DatabaseError::TransactionError(
format!("Trigger '{}' not found", name)
))
}
}
/// Получение таблицы (неизменяемая ссылка)
///
/// Wait-free операция: не блокирует другие операции с базой данных.
///
/// # Аргументы
/// * `name` - имя таблицы
///
/// # Возвращает
/// * `Option<Table>` - копия таблицы или None если не найдена
pub fn get_table(&self, name: &str) -> Option<Table> {
self.tables.get(name).map(|entry| entry.value().clone())
}
/// Получение таблицы для изменения
///
/// Эта операция может блокировать другие операции с той же таблицей,
/// но не блокирует операции с другими таблицами в базе данных.
///
/// # Аргументы
/// * `name` - имя таблицы
///
/// # Возвращает
/// * `Option<impl FnOnce(&mut Table) -> R>` - функция для изменения таблицы
pub fn with_table_mut<F, R>(&self, name: &str, f: F) -> Option<R>
where
F: FnOnce(&mut Table) -> R,
{
if let Some(mut table) = self.tables.get_mut(name) {
Some(f(&mut table))
} else {
None
}
}
/// Параллельное выполнение SQL запросов
pub async fn execute_parallel(&self, queries: Vec<String>) -> Result<Vec<String>, DatabaseError> {
use tokio::task::JoinSet;
let mut results = Vec::with_capacity(queries.len());
let mut tasks = JoinSet::new();
for query in queries {
let db_name = self.name.clone();
tasks.spawn(async move {
// Здесь должен быть парсинг и выполнение запроса
// Для примера просто возвращаем результат
format!("Executed in {}: {}", db_name, query)
});
}
while let Some(result) = tasks.join_next().await {
match result {
Ok(res) => results.push(res),
Err(e) => return Err(DatabaseError::TransactionError(format!("Task error: {}", e))),
}
}
Ok(results)
}
/// Удаление базы данных
///
/// Удаляет директорию базы данных со всеми таблицами и данными.
/// Использует wait-free подход для установки флага неактивности.
///
/// # Возвращает
/// * `Result<(), DatabaseError>` - успех или ошибка удаления
pub async fn drop(self) -> Result<(), DatabaseError> {
// Устанавливаем флаг неактивности wait-free способом
self.is_active.store(false, Ordering::SeqCst);
if Path::new(&self.data_dir).exists() {
// Рекурсивно удаляем директорию базы данных асинхронно
tokio::fs::remove_dir_all(&self.data_dir).await
.map_err(|e| DatabaseError::IoError(std::io::Error::from(e)))?;
// Логируем удаление базы данных
crate::utils::logger::log_info(&format!("Database '{}' dropped", self.name));
}
Ok(())
}
/// Получение списка таблиц в базе данных
///
/// Wait-free операция: просто возвращает копию ключей DashMap.
///
/// # Возвращает
/// * `Vec<String>` - имена всех таблиц в базе данных
pub fn list_tables(&self) -> Vec<String> {
self.tables.iter().map(|entry| entry.key().clone()).collect()
}
/// Сохранение метаданных базы данных
///
/// Сохраняет информацию о всех таблицах в файл meta.json.
/// Эта операция может блокировать доступ к файлу на короткое время.
///
/// # Возвращает
/// * `Result<(), DatabaseError>` - успех или ошибка сохранения
fn save_metadata(&self) -> Result<(), DatabaseError> {
let meta_path = format!("{}/meta.json", self.data_dir);
// Собираем имена всех таблиц
let table_names: Vec<String> = self.list_tables();
// Сериализуем в JSON
let meta_content = serde_json::to_string_pretty(&table_names)
.map_err(|e| DatabaseError::SerializeError(e))?;
// Записываем в файл
fs::write(meta_path, meta_content)
.map_err(|e| DatabaseError::IoError(e))
}
/// Получение имени базы данных
///
/// Wait-free операция: просто возвращает ссылку на строку.
///
/// # Возвращает
/// * `&str` - имя базы данных
pub fn name(&self) -> &str {
&self.name
}
/// Получение количества таблиц в базе данных
///
/// Wait-free операция: просто возвращает размер DashMap.
///
/// # Возвращает
/// * `usize` - количество таблиц
pub fn table_count(&self) -> usize {
self.tables.len()
}
/// Получение директории данных базы
///
/// Wait-free операция: просто возвращает ссылку на строку.
///
/// # Возвращает
/// * `&str` - путь к директории данных
pub fn data_dir(&self) -> &str {
&self.data_dir
}
/// Проверка активности базы данных
///
/// Wait-free операция: использует атомарное чтение булевой переменной.
///
/// # Возвращает
/// * `bool` - true если база данных активна
pub fn is_active(&self) -> bool {
self.is_active.load(Ordering::SeqCst)
}
/// Начало транзакции
///
/// В текущей реализации транзакции эмулируются на уровне REPL.
/// В будущих версиях может быть реализована полноценная поддержка WAL.
///
/// # Возвращает
/// * `Result<(), DatabaseError>` - успех или ошибка
pub fn begin_transaction(&self) -> Result<(), DatabaseError> {
// В текущей реализации просто логируем начало транзакции
crate::utils::logger::log_info(&format!("Transaction started in database '{}'", self.name));
Ok(())
}
/// Фиксация транзакции
///
/// Сохраняет все изменения, сделанные в текущей транзакции.
///
/// # Возвращает
/// * `Result<(), DatabaseError>` - успех или ошибка
pub fn commit_transaction(&self) -> Result<(), DatabaseError> {
// В текущей реализации просто логируем фиксацию транзакции
crate::utils::logger::log_info(&format!("Transaction committed in database '{}'", self.name));
Ok(())
}
/// Откат транзакции
///
/// Отменяет все изменения, сделанные в текущей транзакции.
///
/// # Возвращает
/// * `Result<(), DatabaseError>` - успех или ошибка
pub fn rollback_transaction(&self) -> Result<(), DatabaseError> {
// В текущей реализации просто логируем откат транзакции
crate::utils::logger::log_info(&format!("Transaction rolled back in database '{}'", self.name));
Ok(())
}
}
/// Ошибки базы данных
///
/// Определяет все возможные ошибки, которые могут возникнуть
/// при работе с базами данных flusql.
#[derive(Debug, Error)]
pub enum DatabaseError {
#[error("Database already exists: {0}")]
AlreadyExists(String),
#[error("Database not found: {0}")]
NotFound(String),
#[error("Table already exists: {0}")]
TableExists(String),
#[error("Table not found: {0}")]
TableNotFound(String),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
#[error("Parse error: {0}")]
ParseError(serde_json::Error),
#[error("Serialize error: {0}")]
SerializeError(serde_json::Error),
#[error("Configuration error: {0}")]
ConfigError(String),
#[error("Table error: {0}")]
TableError(#[from] crate::core::table::TableError),
#[error("Transaction error: {0}")]
TransactionError(String),
#[error("Concurrency error: {0}")]
ConcurrencyError(String),
}