futriix/src/server/database.rs

637 lines
27 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// src/server/database.rs
//! Wait-Free документо-ориентированная база данных Futriix
//!
//! Реализует wait-free доступ к данным с использованием атомарных
//! ссылок и lock-free структур данных для максимальной производительности.
//! Автоматически добавляет временные метки ко всем операциям с документами.
#![allow(unused_imports)]
#![allow(dead_code)]
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
use serde::{Serialize, Deserialize};
use serde_json::Value;
use uuid::Uuid;
use dashmap::DashMap;
use crate::common::Result;
use crate::common::protocol;
/// Триггеры для коллекций
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum TriggerEvent {
BeforeCreate,
AfterCreate,
BeforeUpdate,
AfterUpdate,
BeforeDelete,
AfterDelete,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Trigger {
pub name: String,
pub event: TriggerEvent,
pub collection: String,
pub lua_code: String,
}
/// Типы индексов
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum IndexType {
Primary,
Secondary,
}
/// Структура индекса
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Index {
pub name: String,
pub index_type: IndexType,
pub field: String,
pub unique: bool,
}
/// Wait-Free коллекция документов
#[derive(Clone)]
pub struct Collection {
name: String,
documents: Arc<RwLock<std::collections::HashMap<String, Vec<u8>>>>,
sequence: Arc<AtomicU64>,
triggers: Arc<RwLock<Vec<Trigger>>>,
indexes: Arc<RwLock<std::collections::HashMap<String, Index>>>,
index_data: Arc<DashMap<String, std::collections::HashMap<String, Vec<String>>>>,
}
impl Collection {
/// Создание новой wait-free коллекции
pub fn new(name: String) -> Self {
Self {
name,
documents: Arc::new(RwLock::new(std::collections::HashMap::new())),
sequence: Arc::new(AtomicU64::new(0)),
triggers: Arc::new(RwLock::new(Vec::new())),
indexes: Arc::new(RwLock::new(std::collections::HashMap::new())),
index_data: Arc::new(DashMap::new()),
}
}
/// Функция для логирования операций с временной меткой
fn log_operation(&self, operation: &str, id: &str) {
use std::fs::OpenOptions;
use std::io::Write;
// ИСПРАВЛЕНИЕ: Используем системное время с миллисекундами
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string();
let log_message = format!("[{}] Collection: '{}', Operation: '{}', Document ID: '{}'\n",
timestamp, self.name, operation, id);
// Логируем в файл
if let Ok(mut file) = OpenOptions::new()
.create(true)
.append(true)
.open("futriix.log")
{
let _ = file.write_all(log_message.as_bytes());
}
// Также выводим в консоль для отладки
println!("{}", log_message.trim());
}
/// Добавление временной метки к документу
fn add_timestamp_to_document(&self, document: Vec<u8>, operation: &str) -> Result<Vec<u8>> {
// ИСПРАВЛЕНИЕ: Используем системное время с миллисекундами
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string();
// Парсим документ как JSON
let mut doc_value: Value = serde_json::from_slice(&document)
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
// Добавляем временные метки
if let Value::Object(ref mut obj) = doc_value {
obj.insert("_timestamp".to_string(), Value::String(timestamp.clone()));
obj.insert("_operation".to_string(), Value::String(operation.to_string()));
}
// Сериализуем обратно в байты
serde_json::to_vec(&doc_value)
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))
}
/// Добавление триггера
pub fn add_trigger(&self, trigger: Trigger) -> Result<()> {
let mut triggers = self.triggers.write()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
triggers.push(trigger);
Ok(())
}
/// Получение триггеров для события
#[allow(dead_code)]
pub fn get_triggers_for_event(&self, event: TriggerEvent) -> Result<Vec<Trigger>> {
let triggers = self.triggers.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
Ok(triggers.iter()
.filter(|t| t.event == event)
.cloned()
.collect())
}
/// Создание индекса
pub fn create_index(&self, index: Index) -> Result<()> {
let mut indexes = self.indexes.write()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
if indexes.contains_key(&index.name) {
return Err(crate::common::FutriixError::DatabaseError(
format!("Index already exists: {}", index.name)
));
}
// Создаем структуру для хранения данных индекса
self.index_data.insert(index.name.clone(), std::collections::HashMap::new());
let index_clone = index.clone();
indexes.insert(index.name.clone(), index);
// Перестраиваем индекс для существующих документов
self.rebuild_index(&index_clone.name)?;
Ok(())
}
/// Перестроение индекса
fn rebuild_index(&self, index_name: &str) -> Result<()> {
let indexes = self.indexes.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let index = indexes.get(index_name)
.ok_or_else(|| crate::common::FutriixError::DatabaseError(
format!("Index not found: {}", index_name)
))?;
let documents = self.documents.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let mut index_map = std::collections::HashMap::new();
for (id, document_bytes) in documents.iter() {
if let Ok(document) = serde_json::from_slice::<Value>(document_bytes) {
if let Some(field_value) = document.get(&index.field) {
// Конвертируем значение в строку для использования в HashMap
let value_str = field_value.to_string();
let entry = index_map.entry(value_str).or_insert_with(Vec::new);
entry.push(id.clone());
// Проверка уникальности для уникальных индексов
if index.unique && entry.len() > 1 {
return Err(crate::common::FutriixError::DatabaseError(
format!("Duplicate value {} for unique index {}", field_value, index_name)
));
}
}
}
}
self.index_data.insert(index_name.to_string(), index_map);
Ok(())
}
/// Обновление индекса при изменении документа
fn update_indexes(&self, old_document: Option<&[u8]>, new_document: &[u8], document_id: &str) -> Result<()> {
let indexes = self.indexes.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let new_doc_value: Value = serde_json::from_slice(new_document)
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
let old_doc_value: Option<Value> = old_document
.and_then(|doc| serde_json::from_slice(doc).ok());
for (index_name, index) in indexes.iter() {
if let Some(mut index_map) = self.index_data.get_mut(index_name) {
// Удаляем старые значения из индекса
if let Some(old_doc) = &old_doc_value {
if let Some(old_value) = old_doc.get(&index.field) {
let old_value_str = old_value.to_string();
if let Some(entries) = index_map.get_mut(&old_value_str) {
entries.retain(|id| id != document_id);
if entries.is_empty() {
index_map.remove(&old_value_str);
}
}
}
}
// Добавляем новые значения в индекс
if let Some(new_value) = new_doc_value.get(&index.field) {
let new_value_str = new_value.to_string();
let entries = index_map.entry(new_value_str).or_insert_with(Vec::new);
// Проверка уникальности
if index.unique && !entries.is_empty() && entries[0] != document_id {
return Err(crate::common::FutriixError::DatabaseError(
format!("Duplicate value {} for unique index {}", new_value, index_name)
));
}
if !entries.contains(&document_id.to_string()) {
entries.push(document_id.to_string());
}
}
}
}
Ok(())
}
/// Поиск по индексу
pub fn query_by_index(&self, index_name: &str, value: &Value) -> Result<Vec<String>> {
let index_map = self.index_data.get(index_name)
.ok_or_else(|| crate::common::FutriixError::DatabaseError(
format!("Index not found: {}", index_name)
))?;
let value_str = value.to_string();
Ok(index_map.get(&value_str).cloned().unwrap_or_default())
}
/// Wait-Free создание документа с временной меткой
pub fn create_document(&self, document: Vec<u8>) -> Result<String> {
let id = Uuid::new_v4().to_string();
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
// Добавляем временную метку к документу
let document_with_timestamp = self.add_timestamp_to_document(document, "create")?;
// ИСПРАВЛЕНИЕ: Сначала проверяем индексы, потом вставляем документ
// Это предотвращает ситуацию, когда документ вставлен, но индексы не обновлены
self.update_indexes(None, &document_with_timestamp, &id)?;
let mut documents = self.documents.write()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
// Проверяем, не существует ли уже документ с таким ID
if documents.contains_key(&id) {
return Err(crate::common::FutriixError::DatabaseError(
format!("Document with ID {} already exists", id)
));
}
documents.insert(id.clone(), document_with_timestamp);
// Логируем операцию
self.log_operation("create", &id);
println!("Document created in collection '{}' with ID: {} (seq: {})", self.name, id, seq);
Ok(id)
}
/// Wait-Free чтение документа
pub fn read_document(&self, id: &str) -> Result<Option<Vec<u8>>> {
let documents = self.documents.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
// Логируем операцию чтения
self.log_operation("read", id);
Ok(documents.get(id).cloned())
}
/// Wait-Free обновление документа с временной меткой
pub fn update_document(&self, id: &str, document: Vec<u8>) -> Result<()> {
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
// Добавляем временную метку к документу
let document_with_timestamp = self.add_timestamp_to_document(document, "update")?;
let mut documents = self.documents.write()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
if let Some(old_document) = documents.get(id) {
// Обновляем индексы
self.update_indexes(Some(old_document), &document_with_timestamp, id)?;
documents.insert(id.to_string(), document_with_timestamp);
// Логируем операцию
self.log_operation("update", id);
println!("Document updated in collection '{}': {} (seq: {})", self.name, id, seq);
Ok(())
} else {
Err(crate::common::FutriixError::DatabaseError(
format!("Document not found: {}", id)
))
}
}
/// Wait-Free удаление документа с временной меткой
pub fn delete_document(&self, id: &str) -> Result<()> {
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
let mut documents = self.documents.write()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
if let Some(old_document) = documents.get(id) {
// Удаляем из индексов
self.update_indexes(Some(old_document), &[], id)?;
documents.remove(id);
// Логируем операцию
self.log_operation("delete", id);
println!("Document deleted from collection '{}': {} (seq: {})", self.name, id, seq);
Ok(())
} else {
Err(crate::common::FutriixError::DatabaseError(
format!("Document not found: {}", id)
))
}
}
/// Wait-Free запрос документов
pub fn query_documents(&self, _filter: Vec<u8>) -> Result<Vec<Vec<u8>>> {
let documents = self.documents.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
// Логируем операцию запроса
self.log_operation("query", "multiple");
// TODO: Реализовать wait-free фильтрацию на основе filter
let documents: Vec<Vec<u8>> = documents.values().cloned().collect();
Ok(documents)
}
/// Получение имени коллекции (wait-free)
#[allow(dead_code)]
pub fn get_name(&self) -> &str {
&self.name
}
/// Получение количества документов (wait-free)
#[allow(dead_code)]
pub fn count_documents(&self) -> Result<usize> {
let documents = self.documents.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
Ok(documents.len())
}
}
/// Wait-Free база данных
#[derive(Clone)]
pub struct Database {
collections: Arc<DashMap<String, Collection>>,
procedures: Arc<DashMap<String, Vec<u8>>>,
transactions: Arc<DashMap<String, Vec<protocol::Command>>>,
}
impl Database {
/// Создание новой wait-free базы данных
pub fn new() -> Self {
Self {
collections: Arc::new(DashMap::new()),
procedures: Arc::new(DashMap::new()),
transactions: Arc::new(DashMap::new()),
}
}
/// Wait-Free получение или создание коллекции
pub fn get_collection(&self, name: &str) -> Collection {
if let Some(collection) = self.collections.get(name) {
return collection.clone();
}
// Создаем новую коллекцию wait-free способом
let new_collection = Collection::new(name.to_string());
self.collections.insert(name.to_string(), new_collection.clone());
new_collection
}
/// Wait-Free выполнение команды
pub fn execute_command(&self, command: protocol::Command) -> Result<protocol::Response> {
match command {
protocol::Command::Create { collection, document } => {
let coll = self.get_collection(&collection);
match coll.create_document(document) {
Ok(id) => Ok(protocol::Response::Success(id.into_bytes())),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::Read { collection, id } => {
let coll = self.get_collection(&collection);
match coll.read_document(&id) {
Ok(Some(document)) => Ok(protocol::Response::Success(document)),
Ok(None) => Ok(protocol::Response::Error(format!("Document not found: {}", id))),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::Update { collection, id, document } => {
let coll = self.get_collection(&collection);
match coll.update_document(&id, document) {
Ok(_) => Ok(protocol::Response::Success(vec![])),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::Delete { collection, id } => {
let coll = self.get_collection(&collection);
match coll.delete_document(&id) {
Ok(_) => Ok(protocol::Response::Success(vec![])),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::Query { collection, filter } => {
let coll = self.get_collection(&collection);
match coll.query_documents(filter) {
Ok(documents) => {
let json_docs: Vec<Value> = documents.into_iter()
.filter_map(|doc| serde_json::from_slice(&doc).ok())
.collect();
match serde_json::to_vec(&json_docs) {
Ok(data) => Ok(protocol::Response::Success(data)),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::CreateProcedure { name, code } => {
// ИСПРАВЛЕНИЕ: Сохраняем процедуру в procedures map вместо коллекции
self.procedures.insert(name, code);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::CallProcedure { name } => {
if let Some(code) = self.procedures.get(&name) {
// TODO: Выполнить Lua код процедуры
Ok(protocol::Response::Success(format!("Procedure {} executed", name).into_bytes()))
} else {
Ok(protocol::Response::Error(format!("Procedure not found: {}", name)))
}
}
protocol::Command::BeginTransaction { transaction_id } => {
if self.transactions.contains_key(&transaction_id) {
return Ok(protocol::Response::Error("Transaction already exists".to_string()));
}
self.transactions.insert(transaction_id, Vec::new());
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::CommitTransaction { transaction_id } => {
if let Some((_, commands)) = self.transactions.remove(&transaction_id) {
// Выполняем все команды транзакции wait-free способом
for cmd in commands {
if let Err(e) = self.execute_command(cmd) {
return Ok(protocol::Response::Error(format!("Transaction failed: {}", e)));
}
}
Ok(protocol::Response::Success(vec![]))
} else {
Ok(protocol::Response::Error("Transaction not found".to_string()))
}
}
protocol::Command::RollbackTransaction { transaction_id } => {
if self.transactions.remove(&transaction_id).is_some() {
Ok(protocol::Response::Success(vec![]))
} else {
Ok(protocol::Response::Error("Transaction not found".to_string()))
}
}
protocol::Command::CreateIndex { collection, index } => {
let coll = self.get_collection(&collection);
match coll.create_index(index) {
Ok(_) => Ok(protocol::Response::Success(vec![])),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::QueryByIndex { collection, index_name, value } => {
let coll = self.get_collection(&collection);
let value: Value = serde_json::from_slice(&value)
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
match coll.query_by_index(&index_name, &value) {
Ok(document_ids) => {
let result = serde_json::to_vec(&document_ids)
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
Ok(protocol::Response::Success(result))
}
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
// Обработка остальных команд...
_ => {
// TODO: Реализовать обработку остальных команд
Ok(protocol::Response::Error("Command not implemented".to_string()))
}
}
}
/// Wait-Free получение статистики базы данных
#[allow(dead_code)]
pub fn get_stats(&self) -> Result<std::collections::HashMap<String, usize>> {
let mut stats = std::collections::HashMap::new();
stats.insert("collections".to_string(), self.collections.len());
stats.insert("procedures".to_string(), self.procedures.len());
stats.insert("active_transactions".to_string(), self.transactions.len());
// Подсчет документов во всех коллекциях
let total_documents: usize = self.collections.iter()
.map(|entry| entry.value().count_documents().unwrap_or(0))
.sum();
stats.insert("total_documents".to_string(), total_documents);
Ok(stats)
}
/// Создание бэкапа базы данных
pub fn create_backup(&self) -> Result<std::collections::HashMap<String, std::collections::HashMap<String, Vec<u8>>>> {
let mut backup = std::collections::HashMap::new();
for entry in self.collections.iter() {
let name = entry.key().clone();
let collection = entry.value();
let documents = collection.documents.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let mut collection_backup = std::collections::HashMap::new();
for (id, document) in documents.iter() {
collection_backup.insert(id.clone(), document.clone());
}
backup.insert(name, collection_backup);
}
Ok(backup)
}
/// Восстановление из бэкапа
pub fn restore_from_backup(&self, backup: std::collections::HashMap<String, std::collections::HashMap<String, Vec<u8>>>) -> Result<()> {
// Очищаем существующие коллекции
self.collections.clear();
// Восстанавливаем данные из бэкапа
for (collection_name, documents) in backup {
let collection = Collection::new(collection_name.clone());
{
let mut collection_docs = collection.documents.write()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
for (id, document) in documents {
collection_docs.insert(id, document);
}
}
self.collections.insert(collection_name, collection);
}
Ok(())
}
/// Добавление триггера к коллекции
pub fn add_trigger(&self, trigger: Trigger) -> Result<()> {
let collection = self.get_collection(&trigger.collection);
collection.add_trigger(trigger)
}
}