Upload files to "src/server"
This commit is contained in:
parent
ac4552144c
commit
a38e05def5
718
src/server/database.rs
Normal file
718
src/server/database.rs
Normal file
@ -0,0 +1,718 @@
|
|||||||
|
// src/server/database.rs
|
||||||
|
//! Wait-Free документо-ориентированная база данных Futriix
|
||||||
|
//!
|
||||||
|
//! Реализует wait-free доступ к данным с использованием атомарных
|
||||||
|
//! ссылок и lock-free структур данных для максимальной производительности.
|
||||||
|
//! Автоматически добавляет временные метки ко всем операциям с документами.
|
||||||
|
|
||||||
|
#![allow(unused_imports)]
|
||||||
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
use std::sync::RwLock;
|
||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
use serde_json::Value;
|
||||||
|
use uuid::Uuid;
|
||||||
|
use dashmap::DashMap;
|
||||||
|
|
||||||
|
use crate::common::Result;
|
||||||
|
use crate::common::protocol;
|
||||||
|
|
||||||
|
/// Триггеры для коллекций
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
|
pub enum TriggerEvent {
|
||||||
|
BeforeCreate,
|
||||||
|
AfterCreate,
|
||||||
|
BeforeUpdate,
|
||||||
|
AfterUpdate,
|
||||||
|
BeforeDelete,
|
||||||
|
AfterDelete,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Trigger {
|
||||||
|
pub name: String,
|
||||||
|
pub event: TriggerEvent,
|
||||||
|
pub collection: String,
|
||||||
|
pub lua_code: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Типы индексов
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub enum IndexType {
|
||||||
|
Primary,
|
||||||
|
Secondary,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Структура индекса
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Index {
|
||||||
|
pub name: String,
|
||||||
|
pub index_type: IndexType,
|
||||||
|
pub field: String,
|
||||||
|
pub unique: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait-Free коллекция документов
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Collection {
|
||||||
|
name: String,
|
||||||
|
documents: Arc<RwLock<std::collections::HashMap<String, Vec<u8>>>>,
|
||||||
|
sequence: Arc<AtomicU64>,
|
||||||
|
triggers: Arc<RwLock<Vec<Trigger>>>,
|
||||||
|
indexes: Arc<RwLock<std::collections::HashMap<String, Index>>>,
|
||||||
|
index_data: Arc<DashMap<String, std::collections::HashMap<String, Vec<String>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Collection {
|
||||||
|
/// Создание новой wait-free коллекции
|
||||||
|
pub fn new(name: String) -> Self {
|
||||||
|
Self {
|
||||||
|
name,
|
||||||
|
documents: Arc::new(RwLock::new(std::collections::HashMap::new())),
|
||||||
|
sequence: Arc::new(AtomicU64::new(0)),
|
||||||
|
triggers: Arc::new(RwLock::new(Vec::new())),
|
||||||
|
indexes: Arc::new(RwLock::new(std::collections::HashMap::new())),
|
||||||
|
index_data: Arc::new(DashMap::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Функция для логирования операций с временной меткой
|
||||||
|
fn log_operation(&self, operation: &str, id: &str) {
|
||||||
|
use std::fs::OpenOptions;
|
||||||
|
use std::io::Write;
|
||||||
|
|
||||||
|
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string();
|
||||||
|
let log_message = format!("[{}] Collection: '{}', Operation: '{}', Document ID: '{}'\n",
|
||||||
|
timestamp, self.name, operation, id);
|
||||||
|
|
||||||
|
// Логируем в файл
|
||||||
|
if let Ok(mut file) = OpenOptions::new()
|
||||||
|
.create(true)
|
||||||
|
.append(true)
|
||||||
|
.open("futriix.log")
|
||||||
|
{
|
||||||
|
let _ = file.write_all(log_message.as_bytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Также выводим в консоль для отладки
|
||||||
|
println!("{}", log_message.trim());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Добавление временной метки к документу
|
||||||
|
fn add_timestamp_to_document(&self, document: Vec<u8>, operation: &str) -> Result<Vec<u8>> {
|
||||||
|
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string();
|
||||||
|
|
||||||
|
// Парсим документ как JSON
|
||||||
|
let mut doc_value: Value = serde_json::from_slice(&document)
|
||||||
|
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
|
||||||
|
|
||||||
|
// Добавляем временные метки
|
||||||
|
if let Value::Object(ref mut obj) = doc_value {
|
||||||
|
obj.insert("_timestamp".to_string(), Value::String(timestamp.clone()));
|
||||||
|
obj.insert("_operation".to_string(), Value::String(operation.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Сериализуем обратно в байты
|
||||||
|
serde_json::to_vec(&doc_value)
|
||||||
|
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Добавление триггера
|
||||||
|
pub fn add_trigger(&self, trigger: Trigger) -> Result<()> {
|
||||||
|
let mut triggers = self.triggers.write()
|
||||||
|
.map_err(|e| crate::common::FutriixError::IoError(
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
|
||||||
|
))?;
|
||||||
|
|
||||||
|
triggers.push(trigger);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Получение триггеров для события
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub fn get_triggers_for_event(&self, event: TriggerEvent) -> Result<Vec<Trigger>> {
|
||||||
|
let triggers = self.triggers.read()
|
||||||
|
.map_err(|e| crate::common::FutriixError::IoError(
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
|
||||||
|
))?;
|
||||||
|
|
||||||
|
Ok(triggers.iter()
|
||||||
|
.filter(|t| t.event == event)
|
||||||
|
.cloned()
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Создание индекса
|
||||||
|
pub fn create_index(&self, index: Index) -> Result<()> {
|
||||||
|
let mut indexes = self.indexes.write()
|
||||||
|
.map_err(|e| crate::common::FutriixError::IoError(
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
|
||||||
|
))?;
|
||||||
|
|
||||||
|
if indexes.contains_key(&index.name) {
|
||||||
|
return Err(crate::common::FutriixError::DatabaseError(
|
||||||
|
format!("Index already exists: {}", index.name)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Создаем структуру для хранения данных индекса
|
||||||
|
self.index_data.insert(index.name.clone(), std::collections::HashMap::new());
|
||||||
|
|
||||||
|
let index_clone = index.clone();
|
||||||
|
indexes.insert(index.name.clone(), index);
|
||||||
|
|
||||||
|
// Перестраиваем индекс для существующих документов
|
||||||
|
self.rebuild_index(&index_clone.name)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Перестроение индекса
|
||||||
|
fn rebuild_index(&self, index_name: &str) -> Result<()> {
|
||||||
|
let indexes = self.indexes.read()
|
||||||
|
.map_err(|e| crate::common::FutriixError::IoError(
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
|
||||||
|
))?;
|
||||||
|
|
||||||
|
let index = indexes.get(index_name)
|
||||||
|
.ok_or_else(|| crate::common::FutriixError::DatabaseError(
|
||||||
|
format!("Index not found: {}", index_name)
|
||||||
|
))?;
|
||||||
|
|
||||||
|
let documents = self.documents.read()
|
||||||
|
.map_err(|e| crate::common::FutriixError::IoError(
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
|
||||||
|
))?;
|
||||||
|
|
||||||
|
let mut index_map = std::collections::HashMap::new();
|
||||||
|
|
||||||
|
for (id, document_bytes) in documents.iter() {
|
||||||
|
if let Ok(document) = serde_json::from_slice::<Value>(document_bytes) {
|
||||||
|
if let Some(field_value) = document.get(&index.field) {
|
||||||
|
// Конвертируем значение в строку для использования в HashMap
|
||||||
|
let value_str = field_value.to_string();
|
||||||
|
let entry = index_map.entry(value_str).or_insert_with(Vec::new);
|
||||||
|
entry.push(id.clone());
|
||||||
|
|
||||||
|
// Проверка уникальности для уникальных индексов
|
||||||
|
if index.unique && entry.len() > 1 {
|
||||||
|
return Err(crate::common::FutriixError::DatabaseError(
|
||||||
|
format!("Duplicate value {} for unique index {}", field_value, index_name)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.index_data.insert(index_name.to_string(), index_map);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Обновление индекса при изменении документа
|
||||||
|
fn update_indexes(&self, old_document: Option<&[u8]>, new_document: &[u8], document_id: &str) -> Result<()> {
|
||||||
|
let indexes = self.indexes.read()
|
||||||
|
.map_err(|e| crate::common::FutriixError::IoError(
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
|
||||||
|
))?;
|
||||||
|
|
||||||
|
let new_doc_value: Value = serde_json::from_slice(new_document)
|
||||||
|
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
|
||||||
|
|
||||||
|
let old_doc_value: Option<Value> = old_document
|
||||||
|
.and_then(|doc| serde_json::from_slice(doc).ok());
|
||||||
|
|
||||||
|
for (index_name, index) in indexes.iter() {
|
||||||
|
if let Some(mut index_map) = self.index_data.get_mut(index_name) {
|
||||||
|
// Удаляем старые значения из индекса
|
||||||
|
if let Some(old_doc) = &old_doc_value {
|
||||||
|
if let Some(old_value) = old_doc.get(&index.field) {
|
||||||
|
let old_value_str = old_value.to_string();
|
||||||
|
if let Some(entries) = index_map.get_mut(&old_value_str) {
|
||||||
|
entries.retain(|id| id != document_id);
|
||||||
|
if entries.is_empty() {
|
||||||
|
index_map.remove(&old_value_str);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Добавляем новые значения в индекс
|
||||||
|
if let Some(new_value) = new_doc_value.get(&index.field) {
|
||||||
|
let new_value_str = new_value.to_string();
|
||||||
|
let entries = index_map.entry(new_value_str).or_insert_with(Vec::new);
|
||||||
|
|
||||||
|
// Проверка уникальности
|
||||||
|
if index.unique && !entries.is_empty() && entries[0] != document_id {
|
||||||
|
return Err(crate::common::FutriixError::DatabaseError(
|
||||||
|
format!("Duplicate value {} for unique index {}", new_value, index_name)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !entries.contains(&document_id.to_string()) {
|
||||||
|
entries.push(document_id.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Поиск по индексу
|
||||||
|
pub fn query_by_index(&self, index_name: &str, value: &Value) -> Result<Vec<String>> {
|
||||||
|
let index_map = self.index_data.get(index_name)
|
||||||
|
.ok_or_else(|| crate::common::FutriixError::DatabaseError(
|
||||||
|
format!("Index not found: {}", index_name)
|
||||||
|
))?;
|
||||||
|
|
||||||
|
let value_str = value.to_string();
|
||||||
|
Ok(index_map.get(&value_str).cloned().unwrap_or_default())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait-Free создание документа с временной меткой
|
||||||
|
pub fn create_document(&self, document: Vec<u8>) -> Result<String> {
|
||||||
|
let id = Uuid::new_v4().to_string();
|
||||||
|
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
|
||||||
|
|
||||||
|
// Добавляем временную метку к документу
|
||||||
|
let document_with_timestamp = self.add_timestamp_to_document(document, "create")?;
|
||||||
|
|
||||||
|
let mut documents = self.documents.write()
|
||||||
|
.map_err(|e| crate::common::FutriixError::IoError(
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
|
||||||
|
))?;
|
||||||
|
|
||||||
|
// Проверяем уникальность перед вставкой
|
||||||
|
self.update_indexes(None, &document_with_timestamp, &id)?;
|
||||||
|
|
||||||
|
documents.insert(id.clone(), document_with_timestamp);
|
||||||
|
|
||||||
|
// Логируем операцию
|
||||||
|
self.log_operation("create", &id);
|
||||||
|
|
||||||
|
println!("Document created in collection '{}' with ID: {} (seq: {})", self.name, id, seq);
|
||||||
|
Ok(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait-Free чтение документа
|
||||||
|
pub fn read_document(&self, id: &str) -> Result<Option<Vec<u8>>> {
|
||||||
|
let documents = self.documents.read()
|
||||||
|
.map_err(|e| crate::common::FutriixError::IoError(
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
|
||||||
|
))?;
|
||||||
|
|
||||||
|
// Логируем операцию чтения
|
||||||
|
self.log_operation("read", id);
|
||||||
|
|
||||||
|
Ok(documents.get(id).cloned())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait-Free обновление документа с временной меткой
|
||||||
|
pub fn update_document(&self, id: &str, document: Vec<u8>) -> Result<()> {
|
||||||
|
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
|
||||||
|
|
||||||
|
// Добавляем временную метку к документу
|
||||||
|
let document_with_timestamp = self.add_timestamp_to_document(document, "update")?;
|
||||||
|
|
||||||
|
let mut documents = self.documents.write()
|
||||||
|
.map_err(|e| crate::common::FutriixError::IoError(
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
|
||||||
|
))?;
|
||||||
|
|
||||||
|
if let Some(old_document) = documents.get(id) {
|
||||||
|
// Обновляем индексы
|
||||||
|
self.update_indexes(Some(old_document), &document_with_timestamp, id)?;
|
||||||
|
|
||||||
|
documents.insert(id.to_string(), document_with_timestamp);
|
||||||
|
|
||||||
|
// Логируем операцию
|
||||||
|
self.log_operation("update", id);
|
||||||
|
|
||||||
|
println!("Document updated in collection '{}': {} (seq: {})", self.name, id, seq);
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(crate::common::FutriixError::DatabaseError(
|
||||||
|
format!("Document not found: {}", id)
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait-Free удаление документа с временной меткой
|
||||||
|
pub fn delete_document(&self, id: &str) -> Result<()> {
|
||||||
|
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
|
||||||
|
|
||||||
|
let mut documents = self.documents.write()
|
||||||
|
.map_err(|e| crate::common::FutriixError::IoError(
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
|
||||||
|
))?;
|
||||||
|
|
||||||
|
if let Some(old_document) = documents.get(id) {
|
||||||
|
// Удаляем из индексов
|
||||||
|
self.update_indexes(Some(old_document), &[], id)?;
|
||||||
|
|
||||||
|
documents.remove(id);
|
||||||
|
|
||||||
|
// Логируем операцию
|
||||||
|
self.log_operation("delete", id);
|
||||||
|
|
||||||
|
println!("Document deleted from collection '{}': {} (seq: {})", self.name, id, seq);
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(crate::common::FutriixError::DatabaseError(
|
||||||
|
format!("Document not found: {}", id)
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait-Free запрос документов
|
||||||
|
pub fn query_documents(&self, _filter: Vec<u8>) -> Result<Vec<Vec<u8>>> {
|
||||||
|
let documents = self.documents.read()
|
||||||
|
.map_err(|e| crate::common::FutriixError::IoError(
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
|
||||||
|
))?;
|
||||||
|
|
||||||
|
// Логируем операцию запроса
|
||||||
|
self.log_operation("query", "multiple");
|
||||||
|
|
||||||
|
// TODO: Реализовать wait-free фильтрацию на основе filter
|
||||||
|
let documents: Vec<Vec<u8>> = documents.values().cloned().collect();
|
||||||
|
|
||||||
|
Ok(documents)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Получение имени коллекции (wait-free)
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub fn get_name(&self) -> &str {
|
||||||
|
&self.name
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Получение количества документов (wait-free)
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub fn count_documents(&self) -> Result<usize> {
|
||||||
|
let documents = self.documents.read()
|
||||||
|
.map_err(|e| crate::common::FutriixError::IoError(
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
|
||||||
|
))?;
|
||||||
|
|
||||||
|
Ok(documents.len())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait-Free база данных
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Database {
|
||||||
|
collections: Arc<DashMap<String, Collection>>,
|
||||||
|
procedures: Arc<DashMap<String, Vec<u8>>>,
|
||||||
|
transactions: Arc<DashMap<String, Vec<protocol::Command>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Database {
|
||||||
|
/// Создание новой wait-free базы данных
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
collections: Arc::new(DashMap::new()),
|
||||||
|
procedures: Arc::new(DashMap::new()),
|
||||||
|
transactions: Arc::new(DashMap::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait-Free получение или создание коллекции
|
||||||
|
pub fn get_collection(&self, name: &str) -> Collection {
|
||||||
|
if let Some(collection) = self.collections.get(name) {
|
||||||
|
return collection.clone();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Создаем новую коллекцию wait-free способом
|
||||||
|
let new_collection = Collection::new(name.to_string());
|
||||||
|
self.collections.insert(name.to_string(), new_collection.clone());
|
||||||
|
|
||||||
|
new_collection
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait-Free выполнение команды
|
||||||
|
pub fn execute_command(&self, command: protocol::Command) -> Result<protocol::Response> {
|
||||||
|
match command {
|
||||||
|
protocol::Command::Create { collection, document } => {
|
||||||
|
let coll = self.get_collection(&collection);
|
||||||
|
match coll.create_document(document) {
|
||||||
|
Ok(id) => Ok(protocol::Response::Success(id.into_bytes())),
|
||||||
|
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
protocol::Command::Read { collection, id } => {
|
||||||
|
let coll = self.get_collection(&collection);
|
||||||
|
match coll.read_document(&id) {
|
||||||
|
Ok(Some(document)) => Ok(protocol::Response::Success(document)),
|
||||||
|
Ok(None) => Ok(protocol::Response::Error(format!("Document not found: {}", id))),
|
||||||
|
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
protocol::Command::Update { collection, id, document } => {
|
||||||
|
let coll = self.get_collection(&collection);
|
||||||
|
match coll.update_document(&id, document) {
|
||||||
|
Ok(_) => Ok(protocol::Response::Success(vec![])),
|
||||||
|
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
protocol::Command::Delete { collection, id } => {
|
||||||
|
let coll = self.get_collection(&collection);
|
||||||
|
match coll.delete_document(&id) {
|
||||||
|
Ok(_) => Ok(protocol::Response::Success(vec![])),
|
||||||
|
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
protocol::Command::Query { collection, filter } => {
|
||||||
|
let coll = self.get_collection(&collection);
|
||||||
|
match coll.query_documents(filter) {
|
||||||
|
Ok(documents) => {
|
||||||
|
let json_docs: Vec<Value> = documents.into_iter()
|
||||||
|
.filter_map(|doc| serde_json::from_slice(&doc).ok())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
match serde_json::to_vec(&json_docs) {
|
||||||
|
Ok(data) => Ok(protocol::Response::Success(data)),
|
||||||
|
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
protocol::Command::CreateProcedure { name, code } => {
|
||||||
|
self.procedures.insert(name, code);
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::CallProcedure { name } => {
|
||||||
|
if self.procedures.contains_key(&name) {
|
||||||
|
// TODO: Выполнить Lua код процедура
|
||||||
|
Ok(protocol::Response::Success(format!("Procedure {} executed", name).into_bytes()))
|
||||||
|
} else {
|
||||||
|
Ok(protocol::Response::Error(format!("Procedure not found: {}", name)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
protocol::Command::BeginTransaction { transaction_id } => {
|
||||||
|
if self.transactions.contains_key(&transaction_id) {
|
||||||
|
return Ok(protocol::Response::Error("Transaction already exists".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
self.transactions.insert(transaction_id, Vec::new());
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::CommitTransaction { transaction_id } => {
|
||||||
|
if let Some((_, commands)) = self.transactions.remove(&transaction_id) {
|
||||||
|
// Выполняем все команды транзакции wait-free способом
|
||||||
|
for cmd in commands {
|
||||||
|
if let Err(e) = self.execute_command(cmd) {
|
||||||
|
return Ok(protocol::Response::Error(format!("Transaction failed: {}", e)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
} else {
|
||||||
|
Ok(protocol::Response::Error("Transaction not found".to_string()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
protocol::Command::RollbackTransaction { transaction_id } => {
|
||||||
|
if self.transactions.remove(&transaction_id).is_some() {
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
} else {
|
||||||
|
Ok(protocol::Response::Error("Transaction not found".to_string()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
protocol::Command::CreateIndex { collection, index } => {
|
||||||
|
let coll = self.get_collection(&collection);
|
||||||
|
match coll.create_index(index) {
|
||||||
|
Ok(_) => Ok(protocol::Response::Success(vec![])),
|
||||||
|
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
protocol::Command::QueryByIndex { collection, index_name, value } => {
|
||||||
|
let coll = self.get_collection(&collection);
|
||||||
|
let value: Value = serde_json::from_slice(&value)
|
||||||
|
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
|
||||||
|
|
||||||
|
match coll.query_by_index(&index_name, &value) {
|
||||||
|
Ok(document_ids) => {
|
||||||
|
let result = serde_json::to_vec(&document_ids)
|
||||||
|
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
|
||||||
|
Ok(protocol::Response::Success(result))
|
||||||
|
}
|
||||||
|
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Обработка новых команд для шардинга, constraints, компрессии и глобальных индексов
|
||||||
|
protocol::Command::AddShardNode { node_id, address, capacity } => {
|
||||||
|
// TODO: Реализовать добавление шард-узла
|
||||||
|
println!("Adding shard node: {} at {} with capacity {}", node_id, address, capacity);
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::RemoveShardNode { node_id } => {
|
||||||
|
// TODO: Реализовать удаление шард-узла
|
||||||
|
println!("Removing shard node: {}", node_id);
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::MigrateShard { collection, from_node, to_node, shard_key } => {
|
||||||
|
// TODO: Реализовать миграцию шарда
|
||||||
|
println!("Migrating shard from {} to {} for collection {} with key {}", from_node, to_node, collection, shard_key);
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::RebalanceCluster => {
|
||||||
|
// TODO: Реализовать ребалансировку кластера
|
||||||
|
println!("Rebalancing cluster");
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::GetClusterStatus => {
|
||||||
|
// TODO: Реализовать получение статуса кластера
|
||||||
|
let status = protocol::ClusterStatus {
|
||||||
|
nodes: vec![],
|
||||||
|
total_capacity: 0,
|
||||||
|
total_used: 0,
|
||||||
|
rebalance_needed: false,
|
||||||
|
cluster_formed: false,
|
||||||
|
leader_exists: false,
|
||||||
|
raft_nodes: vec![],
|
||||||
|
};
|
||||||
|
match protocol::serialize(&status) {
|
||||||
|
Ok(data) => Ok(protocol::Response::Success(data)),
|
||||||
|
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
protocol::Command::StartElection => {
|
||||||
|
// TODO: Реализовать Raft выборы
|
||||||
|
println!("Starting Raft election");
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::GetRaftNodes => {
|
||||||
|
// TODO: Реализовать получение Raft узлов
|
||||||
|
println!("Getting Raft nodes");
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::AddConstraint { collection, constraint_name, constraint_type, field, value } => {
|
||||||
|
// TODO: Реализовать добавление constraint
|
||||||
|
println!("Adding constraint {} to collection {}: {} {} with value {:?}", constraint_name, collection, constraint_type, field, value);
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::RemoveConstraint { collection, constraint_name } => {
|
||||||
|
// TODO: Реализовать удаление constraint
|
||||||
|
println!("Removing constraint {} from collection {}", constraint_name, collection);
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::EnableCompression { collection, algorithm } => {
|
||||||
|
// TODO: Реализовать включение компрессии
|
||||||
|
println!("Enabling {} compression for collection {}", algorithm, collection);
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::DisableCompression { collection } => {
|
||||||
|
// TODO: Реализовать отключение компрессии
|
||||||
|
println!("Disabling compression for collection {}", collection);
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::CreateGlobalIndex { name, field, unique } => {
|
||||||
|
// TODO: Реализовать создание глобального индекса
|
||||||
|
println!("Creating global index {} on field {} (unique: {})", name, field, unique);
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::QueryGlobalIndex { index_name, value } => {
|
||||||
|
// TODO: Реализовать запрос по глобальному индексу
|
||||||
|
println!("Querying global index {} with value {:?}", index_name, value);
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
// Новые команды для CSV
|
||||||
|
protocol::Command::ImportCsv { collection, file_path } => {
|
||||||
|
// TODO: Интегрировать с CsvManager
|
||||||
|
println!("Importing CSV to collection '{}' from '{}'", collection, file_path);
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::ExportCsv { collection, file_path } => {
|
||||||
|
// TODO: Интегрировать с CsvManager
|
||||||
|
println!("Exporting collection '{}' to CSV file '{}'", collection, file_path);
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::ListCsvFiles => {
|
||||||
|
// TODO: Интегрировать с CsvManager
|
||||||
|
println!("Listing CSV files");
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
protocol::Command::GetImportProgress { collection } => {
|
||||||
|
// TODO: Интегрировать с CsvManager
|
||||||
|
println!("Getting import progress for '{}'", collection);
|
||||||
|
Ok(protocol::Response::Success(vec![]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait-Free получение статистики базы данных
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub fn get_stats(&self) -> Result<std::collections::HashMap<String, usize>> {
|
||||||
|
let mut stats = std::collections::HashMap::new();
|
||||||
|
stats.insert("collections".to_string(), self.collections.len());
|
||||||
|
stats.insert("procedures".to_string(), self.procedures.len());
|
||||||
|
stats.insert("active_transactions".to_string(), self.transactions.len());
|
||||||
|
|
||||||
|
// Подсчет документов во всех коллекциях
|
||||||
|
let total_documents: usize = self.collections.iter()
|
||||||
|
.map(|entry| entry.value().count_documents().unwrap_or(0))
|
||||||
|
.sum();
|
||||||
|
|
||||||
|
stats.insert("total_documents".to_string(), total_documents);
|
||||||
|
|
||||||
|
Ok(stats)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Создание бэкапа базы данных
|
||||||
|
pub fn create_backup(&self) -> Result<std::collections::HashMap<String, std::collections::HashMap<String, Vec<u8>>>> {
|
||||||
|
let mut backup = std::collections::HashMap::new();
|
||||||
|
|
||||||
|
for entry in self.collections.iter() {
|
||||||
|
let name = entry.key().clone();
|
||||||
|
let collection = entry.value();
|
||||||
|
|
||||||
|
let documents = collection.documents.read()
|
||||||
|
.map_err(|e| crate::common::FutriixError::IoError(
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
|
||||||
|
))?;
|
||||||
|
|
||||||
|
let mut collection_backup = std::collections::HashMap::new();
|
||||||
|
for (id, document) in documents.iter() {
|
||||||
|
collection_backup.insert(id.clone(), document.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
backup.insert(name, collection_backup);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(backup)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Восстановление из бэкапа
|
||||||
|
pub fn restore_from_backup(&self, backup: std::collections::HashMap<String, std::collections::HashMap<String, Vec<u8>>>) -> Result<()> {
|
||||||
|
// Очищаем существующие коллекции
|
||||||
|
self.collections.clear();
|
||||||
|
|
||||||
|
// Восстанавливаем данные из бэкапа
|
||||||
|
for (collection_name, documents) in backup {
|
||||||
|
let collection = Collection::new(collection_name.clone());
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut collection_docs = collection.documents.write()
|
||||||
|
.map_err(|e| crate::common::FutriixError::IoError(
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
|
||||||
|
))?;
|
||||||
|
|
||||||
|
for (id, document) in documents {
|
||||||
|
collection_docs.insert(id, document);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.collections.insert(collection_name, collection);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Добавление триггера к коллекции
|
||||||
|
pub fn add_trigger(&self, trigger: Trigger) -> Result<()> {
|
||||||
|
let collection = self.get_collection(&trigger.collection);
|
||||||
|
collection.add_trigger(trigger)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user