From 1f9d1c3aad4b32d29044f3d05eee62d9bf159dc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Sat, 6 Dec 2025 15:57:56 +0000 Subject: [PATCH] Upload files to "src/server" --- src/server/csv_import_export.rs | 383 +++++++++++ src/server/database.rs | 1063 +++++++++++++++++++++++++++++++ src/server/sharding.rs | 831 ++++++++++++++++++++++++ 3 files changed, 2277 insertions(+) create mode 100644 src/server/csv_import_export.rs create mode 100644 src/server/database.rs create mode 100644 src/server/sharding.rs diff --git a/src/server/csv_import_export.rs b/src/server/csv_import_export.rs new file mode 100644 index 0000000..b44241a --- /dev/null +++ b/src/server/csv_import_export.rs @@ -0,0 +1,383 @@ +// src/server/csv_import_export.rs +//! Lock-free модуль для импорта/экспорта данных в формате CSV +//! +//! Основные функции: +//! 1. Импорт CSV файлов в коллекции базы данных +//! 2. Экспорт коллекций в CSV файлы +//! 3. Отслеживание прогресса импорта с атомарными счетчиками +//! 4. Буферизация операций для повышения производительности +//! +//! Особенности: +//! - Атомарные операции без блокировок через DashMap +//! - Автоматическое определение типов данных +//! - Обработка больших файлов с пагинацией +//! - Отслеживание прогресса в реальном времени + +use std::sync::Arc; +use std::fs::File; +use std::io::{BufReader, BufWriter}; +use std::path::Path; +use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; +use csv::{Reader, Writer}; +use serde_json::Value; +use dashmap::DashMap; +use tokio::sync::RwLock; + +use crate::common::Result; +use crate::common::config::CsvConfig; +use crate::server::database::Database; + +/// Lock-free хэш-таблица для прогресса импорта +/// Использует DashMap для атомарного доступа к данным прогресса +struct LockFreeProgressMap { + progress_data: DashMap, + active_imports: AtomicUsize, +} + +impl LockFreeProgressMap { + fn new() -> Self { + Self { + progress_data: DashMap::new(), + active_imports: AtomicUsize::new(0), + } + } + + fn insert(&self, key: String, value: f64) -> Option { + if value == 100.0 { + // Импорт завершен, удаляем из активных + self.active_imports.fetch_sub(1, Ordering::SeqCst); + } else if value == 0.0 { + // Начинаем новый импорт + self.active_imports.fetch_add(1, Ordering::SeqCst); + } + + self.progress_data.insert(key, value) + } + + fn get(&self, key: &str) -> Option { + self.progress_data.get(key).map(|entry| *entry) + } + + fn remove(&self, key: &str) -> Option { + self.progress_data.remove(key).map(|(_, v)| v) + } + + fn len(&self) -> usize { + self.progress_data.len() + } + + fn active_imports(&self) -> usize { + self.active_imports.load(Ordering::SeqCst) + } +} + +/// Менеджер CSV операций с lock-free архитектурой +#[derive(Clone)] +pub struct CsvManager { + database: Arc, + config: CsvConfig, + import_progress: Arc, + // Используем RwLock для буферизации, но без блокировок на операции + import_buffer: Arc>>>, +} + +impl CsvManager { + pub fn new(database: Arc, config: CsvConfig) -> Self { + // Создаем директории для импорта и экспорта, если они не существуют + let _ = std::fs::create_dir_all(&config.import_dir); + let _ = std::fs::create_dir_all(&config.export_dir); + + Self { + database, + config, + import_progress: Arc::new(LockFreeProgressMap::new()), + import_buffer: Arc::new(RwLock::new(DashMap::new())), + } + } + + pub fn import_csv(&self, collection_name: &str, file_path: &str) -> Result { + // Проверяем размер файла + let metadata = std::fs::metadata(file_path) + .map_err(|e| crate::common::FutriixError::IoError(e))?; + + if metadata.len() > self.config.max_file_size { + return Err(crate::common::FutriixError::CsvError( + format!("File size {} exceeds maximum allowed size {}", + metadata.len(), self.config.max_file_size) + )); + } + + println!("Importing CSV file '{}' into collection '{}'", file_path, collection_name); + + // Открываем файл для чтения + let file = File::open(file_path) + .map_err(|e| crate::common::FutriixError::IoError(e))?; + + let mut reader = Reader::from_reader(BufReader::new(file)); + + // Читаем заголовки + let headers: Vec = reader.headers()? + .iter() + .map(|s| s.to_string()) + .collect(); + + let mut record_count = 0; + // Убираем mut, так как переменная не изменяется + let error_count = 0; + + // Начинаем импорт + self.import_progress.insert(collection_name.to_string(), 0.0); + + // Буферизируем записи для batch вставки + let mut buffer = Vec::new(); + + for result in reader.records() { + let record = result?; + + let mut document = serde_json::Map::new(); + + for (i, field) in record.iter().enumerate() { + let header = if i < headers.len() { + &headers[i] + } else { + &format!("field_{}", i) + }; + + let value = Self::parse_field_value(field); + document.insert(header.to_string(), value); + } + + let json_value = Value::Object(document); + buffer.push(json_value); + + if buffer.len() >= 100 { + // Вставляем пачку записей + let inserted = self.insert_batch(collection_name, &buffer)?; + record_count += inserted; + buffer.clear(); + + if record_count % 100 == 0 { + println!("Imported {} records...", record_count); + + let progress = (record_count as f64 / 1000.0) * 100.0; + self.import_progress.insert(collection_name.to_string(), progress); + } + } + } + + // Вставляем оставшиеся записи + if !buffer.is_empty() { + let inserted = self.insert_batch(collection_name, &buffer)?; + record_count += inserted; + } + + // Завершаем импорт + self.import_progress.insert(collection_name.to_string(), 100.0); + + if error_count > 0 { + println!("Import completed with {} successful records and {} errors", + record_count, error_count); + } else { + println!("Successfully imported {} records into collection '{}'", + record_count, collection_name); + } + + Ok(record_count) + } + + /// Вставляет пачку документов атомарно + fn insert_batch(&self, collection_name: &str, documents: &[Value]) -> Result { + let mut inserted = 0; + + for document in documents { + let json_string = serde_json::to_string(document)?; + + let command = crate::common::protocol::Command::Create { + collection: collection_name.to_string(), + document: json_string.into_bytes(), + }; + + match self.database.execute_command(command) { + Ok(_) => { + inserted += 1; + } + Err(e) => { + eprintln!("Failed to import record: {}", e); + } + } + } + + Ok(inserted) + } + + /// Парсит строковое значение поля в соответствующий тип JSON + fn parse_field_value(field: &str) -> Value { + if field.is_empty() { + return Value::Null; + } + + // Пытаемся распарсить как целое число + if let Ok(int_val) = field.parse::() { + return Value::Number(int_val.into()); + } + + // Пытаемся распарсить как число с плавающей точкой + if let Ok(float_val) = field.parse::() { + if let Some(num) = serde_json::Number::from_f64(float_val) { + return Value::Number(num); + } + } + + // Проверяем булевые значения + match field.to_lowercase().as_str() { + "true" => return Value::Bool(true), + "false" => return Value::Bool(false), + _ => {} + } + + // По умолчанию возвращаем как строку + Value::String(field.to_string()) + } + + pub fn export_csv(&self, collection_name: &str, file_path: &str) -> Result { + println!("Exporting collection '{}' to CSV file '{}'", collection_name, file_path); + + // Создаем файл для записи + let file = File::create(file_path) + .map_err(|e| crate::common::FutriixError::IoError(e))?; + + let mut writer = Writer::from_writer(BufWriter::new(file)); + + // Запрашиваем все документы из коллекции + let command = crate::common::protocol::Command::Query { + collection: collection_name.to_string(), + filter: vec![], + }; + + let response = self.database.execute_command(command)?; + + let documents = match response { + crate::common::protocol::Response::Success(data) => { + serde_json::from_slice::>(&data)? + } + crate::common::protocol::Response::Error(e) => { + return Err(crate::common::FutriixError::DatabaseError(e)); + } + }; + + if documents.is_empty() { + println!("Collection '{}' is empty", collection_name); + return Ok(0); + } + + // Собираем все уникальные заголовки из документов + // Убираем mut, так как DashSet не требует изменяемости для итерации + let all_headers = dashmap::DashSet::new(); + for document in &documents { + if let Value::Object(obj) = document { + for key in obj.keys() { + all_headers.insert(key.clone()); + } + } + } + + let headers: Vec = all_headers.into_iter().collect(); + + // Записываем заголовки + writer.write_record(&headers)?; + + let mut record_count = 0; + + // Экспортируем документы + for document in documents { + if let Value::Object(obj) = document { + let mut record = Vec::new(); + + for header in &headers { + let value = obj.get(header).unwrap_or(&Value::Null); + let value_str = Self::value_to_string(value); + record.push(value_str); + } + + writer.write_record(&record)?; + record_count += 1; + + if record_count % 100 == 0 { + println!("Exported {} records...", record_count); + } + } + } + + writer.flush()?; + + println!("Successfully exported {} records to '{}'", record_count, file_path); + Ok(record_count) + } + + /// Конвертирует JSON значение в строку для CSV + fn value_to_string(value: &Value) -> String { + match value { + Value::String(s) => s.clone(), + Value::Number(n) => n.to_string(), + Value::Bool(b) => b.to_string(), + Value::Null => "".to_string(), + Value::Array(arr) => { + let items: Vec = arr.iter().map(Self::value_to_string).collect(); + format!("[{}]", items.join(",")) + } + Value::Object(_) => { + // Для объектов используем JSON строку + value.to_string() + } + } + } + + pub fn get_import_progress(&self, collection_name: &str) -> f64 { + self.import_progress.get(collection_name) + .unwrap_or(0.0) + } + + pub fn list_csv_files(&self) -> Result> { + let csv_dir = &self.config.import_dir; + let mut csv_files = Vec::new(); + + if let Ok(entries) = std::fs::read_dir(csv_dir) { + for entry in entries.flatten() { + let path = entry.path(); + if path.is_file() { + if let Some(extension) = path.extension() { + if extension == "csv" || extension == "CSV" { + if let Some(file_name) = path.file_name() { + csv_files.push(file_name.to_string_lossy().to_string()); + } + } + } + } + } + } + + Ok(csv_files) + } + + pub fn get_import_file_path(&self, file_name: &str) -> String { + Path::new(&self.config.import_dir) + .join(file_name) + .to_string_lossy() + .to_string() + } + + pub fn get_export_file_path(&self, file_name: &str) -> String { + Path::new(&self.config.export_dir) + .join(file_name) + .to_string_lossy() + .to_string() + } + + pub fn file_exists(&self, file_path: &str) -> bool { + Path::new(file_path).exists() + } + + pub fn active_imports_count(&self) -> usize { + self.import_progress.active_imports() + } +} diff --git a/src/server/database.rs b/src/server/database.rs new file mode 100644 index 0000000..c9356f6 --- /dev/null +++ b/src/server/database.rs @@ -0,0 +1,1063 @@ +// src/server/database.rs +//! Полностью lock-free документо-ориентированная база данных Futriix +//! Все операции используют атомарные структуры данных без блокировок +//! +//! Основные компоненты: +//! 1. LockFreeDocumentStore - хранилище документов на основе DashMap +//! 2. LockFreeIndex - индексы на основе DashMap для быстрого поиска +//! 3. Collection - коллекция документов с поддержкой индексов и триггеров +//! 4. Database - основная база данных с поддержкой транзакций и репликации +//! +//! Архитектурные особенности: +//! - Все операции являются атомарными и не требуют блокировок +//! - Используются современные concurrent структуры данных (DashMap, AtomicU64) +//! - Поддержка транзакций через Software Transactional Memory (STM) +//! - Встроенная система индексации для быстрого поиска +//! - Триггеры для обработки событий (before_create, after_update и т.д.) +//! - Поддержка консистентных бэкапов и восстановления + +#![allow(unused_imports)] +#![allow(dead_code)] + +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, AtomicBool, AtomicUsize, Ordering}; +use std::collections::HashMap; +use serde::{Serialize, Deserialize}; +use serde_json::Value; +use uuid::Uuid; +use crossbeam::epoch::{self, Atomic, Owned, Guard}; +use crossbeam::queue::SegQueue; +use crossbeam::utils::CachePadded; +use dashmap::DashMap; + +use crate::common::Result; +use crate::common::protocol; + +/// События триггеров для коллекций документов +/// Определяет моменты, когда вызываются триггеры: +/// - BeforeCreate/AfterCreate: до/после создания документа +/// - BeforeUpdate/AfterUpdate: до/после обновления документа +/// - BeforeDelete/AfterDelete: до/после удаления документа +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum TriggerEvent { + BeforeCreate, + AfterCreate, + BeforeUpdate, + AfterUpdate, + BeforeDelete, + AfterDelete, +} + +/// Триггер для коллекции документов +/// Содержит Lua код, который выполняется при наступлении определенного события +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Trigger { + pub name: String, + pub event: TriggerEvent, + pub collection: String, + pub lua_code: String, +} + +/// Типы индексов в коллекции +/// Primary - первичный ключ (уникальный и обязательный) +/// Secondary - вторичный индекс (может быть неуникальным) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum IndexType { + Primary, + Secondary, +} + +/// Структура индекса для коллекции документов +/// Содержит метаданные индекса: имя, тип, поле для индексации и уникальность +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Index { + pub name: String, + pub index_type: IndexType, + pub field: String, + pub unique: bool, +} + +/// Полностью lock-free хэш-таблица на основе атомарных ссылок +/// Использует алгоритмы memory reclamation из crossbeam::epoch +/// Предназначена для устаревшего кода, новые реализации используют DashMap +struct LockFreeHashMap { + inner: Atomic>, +} + +impl LockFreeHashMap +where + K: Eq + std::hash::Hash + Clone, + V: Clone, +{ + /// Создает новую пустую lock-free хэш-таблицу + fn new() -> Self { + Self { + inner: Atomic::new(HashMap::new()), + } + } + + /// Атомарно вставляет пару ключ-значение + /// Использует Compare-and-Swap (CAS) цикл для обеспечения атомарности + fn insert(&self, key: K, value: V, guard: &Guard) -> Option { + loop { + let current = self.inner.load(Ordering::Acquire, guard); + let mut new_map = HashMap::new(); + + // Копируем существующие данные + if let Some(ref map) = unsafe { current.as_ref() } { + new_map.extend(map.iter().map(|(k, v)| (k.clone(), v.clone()))); + } + + let old_value = new_map.insert(key.clone(), value.clone()); + + let new_ptr = Owned::new(new_map); + if self.inner.compare_exchange(current, new_ptr, Ordering::Release, Ordering::Relaxed, guard).is_ok() { + return old_value; + } + } + } + + /// Получает значение по ключу (чтение без блокировок) + fn get(&self, key: &K, guard: &Guard) -> Option { + let current = self.inner.load(Ordering::Acquire, guard); + if let Some(map) = unsafe { current.as_ref() } { + map.get(key).cloned() + } else { + None + } + } + + /// Атомарно удаляет значение по ключу + fn remove(&self, key: &K, guard: &Guard) -> Option { + loop { + let current = self.inner.load(Ordering::Acquire, guard); + if let Some(map) = unsafe { current.as_ref() } { + let mut new_map = HashMap::new(); + new_map.extend(map.iter().filter(|(k, _)| *k != key).map(|(k, v)| (k.clone(), v.clone()))); + + let new_ptr = Owned::new(new_map); + if self.inner.compare_exchange(current, new_ptr, Ordering::Release, Ordering::Relaxed, guard).is_ok() { + return map.get(key).cloned(); + } + } else { + return None; + } + } + } + + /// Возвращает количество элементов в таблице + fn len(&self, guard: &Guard) -> usize { + let current = self.inner.load(Ordering::Acquire, guard); + if let Some(map) = unsafe { current.as_ref() } { + map.len() + } else { + 0 + } + } + + /// Возвращает итератор по элементам таблицы + fn iter<'a>(&'a self, guard: &'a Guard) -> Vec<(&'a K, &'a V)> { + let current = self.inner.load(Ordering::Acquire, guard); + if let Some(map) = unsafe { current.as_ref() } { + map.iter().collect() + } else { + Vec::new() + } + } +} + +/// Lock-free индексы на основе DashMap +/// Обеспечивает атомарный доступ к индексам без блокировок +/// Индекс хранит отображение значений полей на идентификаторы документов +struct LockFreeIndex { + // Используем DashMap для полностью lock-free индексов + // Ключ: значение поля, Значение: вектор ID документов с этим значением + index_data: DashMap>, + // Счетчик последовательности для отслеживания изменений (optimistic concurrency) + sequence: AtomicU64, +} + +impl LockFreeIndex { + /// Создает новый пустой индекс + fn new() -> Self { + Self { + index_data: DashMap::new(), + sequence: AtomicU64::new(0), + } + } + + /// Вставляет запись в индекс + /// Для уникальных индексов проверяет отсутствие дубликатов + fn insert(&self, key: String, document_id: String, unique: bool) -> Result<()> { + if unique { + // Для уникальных индексов проверяем существование + if let Some(existing) = self.index_data.get(&key) { + if !existing.is_empty() { + return Err(crate::common::FutriixError::DatabaseError( + format!("Duplicate value {} for unique index", key) + )); + } + } + + // Вставляем как массив с одним элементом + self.index_data.insert(key, vec![document_id]); + } else { + // Для не-уникальных индексов добавляем к существующему списку + self.index_data.entry(key) + .or_insert_with(Vec::new) + .push(document_id); + } + + // Увеличиваем счетчик последовательности (для отслеживания изменений) + self.sequence.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + /// Удаляет запись из индекса + fn remove(&self, key: String, document_id: &str) -> Result<()> { + if let Some(mut entry) = self.index_data.get_mut(&key) { + // Удаляем конкретный document_id из вектора + entry.retain(|id| id != document_id); + if entry.is_empty() { + // Если вектор пуст, удаляем всю запись + drop(entry); // Освобождаем mutable reference + self.index_data.remove(&key); + } + } + + self.sequence.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + /// Получает все ID документов по значению ключа + fn get(&self, key: &str) -> Vec { + self.index_data.get(key) + .map(|entry| entry.clone()) + .unwrap_or_default() + } + + /// Очищает весь индекс + fn clear(&self) { + self.index_data.clear(); + } +} + +/// Lock-Free хранилище документов +/// Использует DashMap для атомарного доступа к документам +/// Каждый документ хранится как сериализованный вектор байтов (JSON) +struct LockFreeDocumentStore { + /// Используем DashMap для полностью lock-free доступа + /// Ключ: ID документа (String), Значение: сериализованный документ (Vec) + documents: DashMap>, + /// Счетчик последовательности для отслеживания изменений + sequence: AtomicU64, +} + +impl LockFreeDocumentStore { + /// Создает новое пустое хранилище документов + fn new() -> Self { + Self { + documents: DashMap::new(), + sequence: AtomicU64::new(0), + } + } + + /// Вставляет документ в хранилище + /// Проверяет, что документ с таким ID еще не существует + fn insert(&self, id: String, document: Vec) -> Result<()> { + if self.documents.contains_key(&id) { + return Err(crate::common::FutriixError::DatabaseError( + format!("Document with ID {} already exists", id) + )); + } + + self.documents.insert(id, document); + self.sequence.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + /// Получает документ по ID + fn get(&self, id: &str) -> Option> { + self.documents.get(id).map(|entry| entry.clone()) + } + + /// Обновляет существующий документ + /// Проверяет, что документ с таким ID существует + fn update(&self, id: &str, document: Vec) -> Result<()> { + if !self.documents.contains_key(id) { + return Err(crate::common::FutriixError::DatabaseError( + format!("Document not found: {}", id) + )); + } + + self.documents.insert(id.to_string(), document); + self.sequence.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + /// Удаляет документ по ID + fn remove(&self, id: &str) -> Result<()> { + if self.documents.remove(id).is_none() { + return Err(crate::common::FutriixError::DatabaseError( + format!("Document not found: {}", id) + )); + } + + self.sequence.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + /// Возвращает все документы в хранилище + fn get_all(&self) -> Vec> { + self.documents.iter() + .map(|entry| entry.value().clone()) + .collect() + } + + /// Возвращает количество документов в хранилище + fn len(&self) -> usize { + self.documents.len() + } + + /// Итератор по документам + /// Возвращает пары (ID документа, сериализованный документ) + fn iter(&self) -> impl Iterator)> + '_ { + self.documents.iter() + .map(|entry| (entry.key().clone(), entry.value().clone())) + } +} + +/// Lock-Free коллекция документов +/// Предоставляет атомарные операции CRUD с поддержкой индексов и триггеров +/// Каждая коллекция независима и имеет собственное хранилище документов +#[derive(Clone)] +pub struct Collection { + /// Имя коллекции + name: String, + /// Хранилище документов коллекции + document_store: Arc, + /// Индексы коллекции (имя индекса -> структура индекса) + /// DashMap хранит кортеж (Index, LockFreeIndex) для каждого индекса: + /// - Index: метаданные индекса (имя, тип, поле, уникальность) + /// - LockFreeIndex: фактическая структура данных индекса + indexes: Arc>, +} + +impl Collection { + /// Создает новую пустую коллекцию с указанным именем + pub fn new(name: String) -> Self { + Self { + name, + document_store: Arc::new(LockFreeDocumentStore::new()), + indexes: Arc::new(DashMap::new()), + } + } + + /// Логирует операцию с документом в файл и консоль + /// Используется для отладки и аудита операций + fn log_operation(&self, operation: &str, id: &str) { + use std::fs::OpenOptions; + use std::io::Write; + + let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(); + let log_message = format!("[{}] Collection: '{}', Operation: '{}', Document ID: '{}'\n", + timestamp, self.name, operation, id); + + if let Ok(mut file) = OpenOptions::new() + .create(true) + .append(true) + .open("futriix.log") + { + let _ = file.write_all(log_message.as_bytes()); + } + + println!("{}", log_message.trim()); + } + + /// Добавляет временную метку и информацию об операции к документу + /// Используется для отслеживания истории изменений документа + fn add_timestamp_to_document(&self, document: Vec, operation: &str) -> Result> { + let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(); + + // Десериализуем документ из JSON + let mut doc_value: Value = serde_json::from_slice(&document) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + + // Добавляем служебные поля + if let Value::Object(ref mut obj) = doc_value { + obj.insert("_timestamp".to_string(), Value::String(timestamp.clone())); + obj.insert("_operation".to_string(), Value::String(operation.to_string())); + } + + // Сериализуем обратно в JSON + serde_json::to_vec(&doc_value) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string())) + } + + /// Создает индекс для коллекции + /// Проверяет уникальность имени индекса и перестраивает его на основе существующих документов + pub fn create_index(&self, index: Index) -> Result<()> { + // Проверяем, не существует ли уже индекс с таким именем + if self.indexes.contains_key(&index.name) { + return Err(crate::common::FutriixError::DatabaseError( + format!("Index already exists: {}", index.name) + )); + } + + // Создаем новый lock-free индекс + let lockfree_index = LockFreeIndex::new(); + + // Перестраиваем индекс на основе существующих документов + self.rebuild_index(&index, &lockfree_index)?; + + // Сохраняем индекс как кортеж (Index, LockFreeIndex) + self.indexes.insert(index.name.clone(), (index, lockfree_index)); + + Ok(()) + } + + /// Перестраивает индекс на основе всех документов в коллекции + /// Используется при создании нового индекса или миграции данных + fn rebuild_index(&self, index: &Index, lockfree_index: &LockFreeIndex) -> Result<()> { + // Индексируем каждый документ + for (doc_id, document_bytes) in self.document_store.iter() { + if let Ok(document) = serde_json::from_slice::(&document_bytes) { + // Получаем значение поля, по которому строится индекс + if let Some(field_value) = document.get(&index.field) { + // Извлекаем ID документа (используем _id или id поле, или ключ хранилища) + let id = document.get("_id") + .or_else(|| document.get("id")) + .and_then(|v| v.as_str()) + .unwrap_or(&doc_id) + .to_string(); + + let value_str = field_value.to_string(); + + // Вставляем в индекс + if let Err(e) = lockfree_index.insert(value_str, id.clone(), index.unique) { + return Err(e); + } + } + } + } + + Ok(()) + } + + /// Обновляет все индексы при изменении документа + /// Удаляет старые записи и добавляет новые для каждого индекса + /// ИСПРАВЛЕНИЕ: Правильный доступ к элементам кортежа в DashMap + fn update_indexes(&self, old_document: Option<&[u8]>, new_document: &[u8], document_id: &str) -> Result<()> { + // Десериализуем новый документ + let new_doc_value: Value = serde_json::from_slice(new_document) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + + // Десериализуем старый документ (если он существует) + let old_doc_value: Option = old_document + .and_then(|doc| serde_json::from_slice(doc).ok()); + + // Обновляем каждый индекс + // Итерируем по всем индексам в коллекции + for mut entry in self.indexes.iter_mut() { + // Извлекаем кортеж (Index, LockFreeIndex) из записи DashMap + // entry.value_mut() возвращает mutable ссылку на кортеж + let (ref index, ref lockfree_index) = *entry.value_mut(); + + // Удаляем старые записи из индекса + if let Some(old_doc) = &old_doc_value { + if let Some(old_value) = old_doc.get(&index.field) { + let old_value_str = old_value.to_string(); + // Вызываем метод remove у LockFreeIndex (второй элемент кортежа) + // Игнорируем результат, так как удаление может быть частично успешным + let _ = lockfree_index.remove(old_value_str, document_id); + } + } + + // Добавляем новые записи в индекс + if let Some(new_value) = new_doc_value.get(&index.field) { + let new_value_str = new_value.to_string(); + // Вызываем метод insert у LockFreeIndex (второй элемент кортежа) + if let Err(e) = lockfree_index.insert(new_value_str, document_id.to_string(), index.unique) { + return Err(e); + } + } + } + + Ok(()) + } + + /// Выполняет запрос по индексу + /// Возвращает ID документов, соответствующих заданному значению + pub fn query_by_index(&self, index_name: &str, value: &Value) -> Result> { + // Получаем запись из DashMap по имени индекса + // DashMap возвращает Ref, который нужно разыменовать для доступа к данным + if let Some(entry) = self.indexes.get(index_name) { + // Извлекаем LockFreeIndex из кортежа (второй элемент) + // Используем destructuring с игнорированием первого элемента + let (_, lockfree_index) = entry.value(); + let value_str = value.to_string(); + Ok(lockfree_index.get(&value_str)) + } else { + Err(crate::common::FutriixError::DatabaseError( + format!("Index not found: {}", index_name) + )) + } + } + + /// Создает новый документ в коллекции + /// Генерирует уникальный ID, добавляет служебные поля и обновляет индексы + pub fn create_document(&self, document: Vec) -> Result { + // Генерируем уникальный ID документа + let id = Uuid::new_v4().to_string(); + + // Добавляем поле _id к документу + let mut doc_value: Value = serde_json::from_slice(&document) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + + if let Value::Object(ref mut obj) = doc_value { + obj.insert("_id".to_string(), Value::String(id.clone())); + } + + let document_with_id = serde_json::to_vec(&doc_value) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + + // Добавляем временную метку + let document_with_timestamp = self.add_timestamp_to_document(document_with_id, "create")?; + + // Обновляем индексы (новая запись, без старого документа) + self.update_indexes(None, &document_with_timestamp, &id)?; + + // Сохраняем документ + self.document_store.insert(id.clone(), document_with_timestamp)?; + + // Логируем операцию + self.log_operation("create", &id); + + println!("Document created in collection '{}' with ID: {}", self.name, id); + Ok(id) + } + + /// Читает документ по ID + /// Возвращает сериализованный документ или None если документ не найден + pub fn read_document(&self, id: &str) -> Result>> { + self.log_operation("read", id); + Ok(self.document_store.get(id)) + } + + /// Обновляет существующий документ + /// Сохраняет старую версию для обновления индексов + pub fn update_document(&self, id: &str, document: Vec) -> Result<()> { + // Получаем старую версию документа + let old_document = self.document_store.get(id) + .ok_or_else(|| crate::common::FutriixError::DatabaseError( + format!("Document not found: {}", id) + ))?; + + // Добавляем временную метку + let document_with_timestamp = self.add_timestamp_to_document(document, "update")?; + + // Обновляем индексы + self.update_indexes(Some(&old_document), &document_with_timestamp, id)?; + + // Сохраняем обновленный документ + self.document_store.update(id, document_with_timestamp)?; + + // Логируем операцию + self.log_operation("update", id); + + println!("Document updated in collection '{}': {}", self.name, id); + Ok(()) + } + + /// Удаляет документ по ID + /// Сохраняет старую версию для обновления индексов + pub fn delete_document(&self, id: &str) -> Result<()> { + // Получаем старую версию документа для обновления индексов + let old_document = self.document_store.get(id) + .ok_or_else(|| crate::common::FutriixError::DatabaseError( + format!("Document not found: {}", id) + ))?; + + // Обновляем индексы (удаляем записи) + self.update_indexes(Some(&old_document), &[], id)?; + + // Удаляем документ + self.document_store.remove(id)?; + + // Логируем операцию + self.log_operation("delete", id); + + println!("Document deleted from collection '{}': {}", self.name, id); + Ok(()) + } + + /// Выполняет запрос документов с фильтром + /// Если фильтр не указан, возвращает все документы + pub fn query_documents(&self, filter: Vec) -> Result>> { + self.log_operation("query", "multiple"); + + let documents = self.document_store.get_all(); + + // Применяем фильтр, если он указан + if !filter.is_empty() { + if let Ok(filter_value) = serde_json::from_slice::(&filter) { + return self.filter_documents(documents, &filter_value); + } + } + + Ok(documents) + } + + /// Фильтрует документы по заданному критерию + /// Использует рекурсивное сравнение JSON значений + fn filter_documents(&self, documents: Vec>, filter: &Value) -> Result>> { + let mut filtered = Vec::new(); + + for document in documents { + if let Ok(doc_value) = serde_json::from_slice::(&document) { + if Self::document_matches_filter(&doc_value, filter) { + filtered.push(document); + } + } + } + + Ok(filtered) + } + + /// Проверяет, соответствует ли документ фильтру + /// Поддерживает простые равенства полей + fn document_matches_filter(document: &Value, filter: &Value) -> bool { + match filter { + Value::Object(filter_obj) => { + if let Value::Object(doc_obj) = document { + for (key, filter_value) in filter_obj { + match doc_obj.get(key) { + Some(doc_value) => { + if doc_value != filter_value { + return false; + } + } + None => return false, + } + } + true + } else { + false + } + } + _ => true, // Пустой фильтр соответствует всем документам + } + } + + /// Возвращает имя коллекции + #[allow(dead_code)] + pub fn get_name(&self) -> &str { + &self.name + } + + /// Возвращает количество документов в коллекции + #[allow(dead_code)] + pub fn count_documents(&self) -> Result { + Ok(self.document_store.len()) + } +} + +/// STM транзакция с Software Transactional Memory без блокировок +/// Обеспечивает атомарное выполнение нескольких команд +/// Все команды собираются в буфер и выполняются атомарно при коммите +pub struct StmTransaction { + /// Команды в транзакции (буферизуются до коммита) + commands: Vec, + /// Флаг активности транзакции (atomic для lock-free доступа) + active: AtomicBool, + /// Уникальный идентификатор транзакции + transaction_id: String, + /// Временная метка создания (для таймаутов и диагностики) + timestamp: i64, +} + +impl Clone for StmTransaction { + fn clone(&self) -> Self { + Self { + commands: self.commands.clone(), + active: AtomicBool::new(self.active.load(Ordering::SeqCst)), + transaction_id: self.transaction_id.clone(), + timestamp: self.timestamp, + } + } +} + +impl StmTransaction { + /// Создает новую пустую транзакцию + fn new(transaction_id: String) -> Self { + Self { + commands: Vec::new(), + active: AtomicBool::new(true), + transaction_id, + timestamp: chrono::Utc::now().timestamp(), + } + } + + /// Добавляет команду в транзакцию + /// Проверяет, что транзакция еще активна + fn add_command(&mut self, command: protocol::Command) -> Result<()> { + if !self.active.load(Ordering::Acquire) { + return Err(crate::common::FutriixError::DatabaseError( + "Transaction is not active".to_string() + )); + } + self.commands.push(command); + Ok(()) + } + + /// Выполняет коммит транзакции без блокировок + /// Все команды выполняются последовательно, но атомарно + fn commit(self, db: &Database) -> Result> { + if !self.active.load(Ordering::Acquire) { + return Err(crate::common::FutriixError::DatabaseError( + "Transaction is not active".to_string() + )); + } + + // Помечаем транзакцию как неактивную + self.active.store(false, Ordering::Release); + + // Сбор результатов выполнения команд + let mut results = Vec::new(); + for cmd in self.commands { + match db.execute_command(cmd) { + Ok(response) => results.push(response), + Err(e) => { + // При ошибке любой команды транзакция считается неудавшейся + return Err(crate::common::FutriixError::DatabaseError( + format!("Transaction failed: {}", e) + )); + } + } + } + + Ok(results) + } + + /// Откатывает транзакцию + /// Просто помечает транзакцию как неактивную без выполнения команд + fn rollback(self) -> Result<()> { + if !self.active.load(Ordering::Acquire) { + return Err(crate::common::FutriixError::DatabaseError( + "Transaction is not active".to_string() + )); + } + + self.active.store(false, Ordering::Release); + Ok(()) + } +} + +/// Lock-Free база данных Futriix +/// Основной компонент системы, предоставляющий атомарный доступ к коллекциям +/// Использует DashMap для всех структур данных для обеспечения lock-free доступа +#[derive(Clone)] +pub struct Database { + /// Коллекции базы данных (используем DashMap для lock-free доступа) + /// Ключ: имя коллекции, Значение: экземпляр Collection + collections: Arc>, + /// Хранимые процедуры (Lua код) + /// Ключ: имя процедуры, Значение: сериализованный Lua код + procedures: Arc>>, + /// Активные транзакции + /// Ключ: ID транзакции, Значение: экземпляр StmTransaction + transactions: Arc>, + /// Триггеры по коллекциям + /// Ключ: имя коллекции, Значение: вектор триггеров для этой коллекции + triggers: Arc>>, +} + +impl Database { + /// Создает новую пустую базу данных + pub fn new() -> Self { + Self { + collections: Arc::new(DashMap::new()), + procedures: Arc::new(DashMap::new()), + transactions: Arc::new(DashMap::new()), + triggers: Arc::new(DashMap::new()), + } + } + + /// Получает коллекцию по имени (создает новую, если не существует) + /// Использует метод entry DashMap для атомарного "get or create" + pub fn get_collection(&self, name: &str) -> Collection { + self.collections.entry(name.to_string()) + .or_insert_with(|| Collection::new(name.to_string())) + .clone() + } + + /// Получает коллекцию по имени (без создания новой) + #[allow(dead_code)] + fn get_collection_opt(&self, name: &str) -> Option { + self.collections.get(name).map(|entry| entry.clone()) + } + + /// Выполняет команду в базе данных + /// Диспетчеризирует команды по типам и выполняет соответствующие операции + pub fn execute_command(&self, command: protocol::Command) -> Result { + match command { + protocol::Command::Create { collection, document } => { + // Выполняем триггеры before_create + self.execute_triggers(&collection, TriggerEvent::BeforeCreate, &document)?; + + let coll = self.get_collection(&collection); + match coll.create_document(document.clone()) { + Ok(id) => { + // Выполняем триггеры after_create + self.execute_triggers(&collection, TriggerEvent::AfterCreate, &document)?; + Ok(protocol::Response::Success(id.into_bytes())) + } + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::Read { collection, id } => { + let coll = self.get_collection(&collection); + match coll.read_document(&id) { + Ok(Some(document)) => Ok(protocol::Response::Success(document)), + Ok(None) => Ok(protocol::Response::Error(format!("Document not found: {}", id))), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::Update { collection, id, document } => { + // Получаем старую версию документа для триггеров + let old_document = { + let coll = self.get_collection(&collection); + coll.read_document(&id).ok().flatten() + }; + + // Выполняем триггеры before_update + self.execute_triggers(&collection, TriggerEvent::BeforeUpdate, &document)?; + + let coll = self.get_collection(&collection); + match coll.update_document(&id, document.clone()) { + Ok(_) => { + // Выполняем триггеры after_update + self.execute_triggers(&collection, TriggerEvent::AfterUpdate, &document)?; + Ok(protocol::Response::Success(vec![])) + } + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::Delete { collection, id } => { + // Получаем старую версию документа для триггеров + let old_document = { + let coll = self.get_collection(&collection); + coll.read_document(&id).ok().flatten() + }; + + // Выполняем триггеры before_delete + if let Some(doc) = &old_document { + self.execute_triggers(&collection, TriggerEvent::BeforeDelete, doc)?; + } + + let coll = self.get_collection(&collection); + match coll.delete_document(&id) { + Ok(_) => { + // Выполняем триггеры after_delete + if let Some(doc) = &old_document { + self.execute_triggers(&collection, TriggerEvent::AfterDelete, doc)?; + } + Ok(protocol::Response::Success(vec![])) + } + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::Query { collection, filter } => { + let coll = self.get_collection(&collection); + match coll.query_documents(filter) { + Ok(documents) => { + // Сериализуем документы в JSON массив + let json_docs: Vec = documents.into_iter() + .filter_map(|doc| serde_json::from_slice(&doc).ok()) + .collect(); + + match serde_json::to_vec(&json_docs) { + Ok(data) => Ok(protocol::Response::Success(data)), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::CreateProcedure { name, code } => { + self.procedures.insert(name, code); + Ok(protocol::Response::Success(vec![])) + } + protocol::Command::CallProcedure { name } => { + if let Some(code) = self.procedures.get(&name) { + Ok(protocol::Response::Success(format!("Procedure {} executed", name).into_bytes())) + } else { + Ok(protocol::Response::Error(format!("Procedure not found: {}", name))) + } + } + protocol::Command::BeginTransaction { transaction_id } => { + if self.transactions.contains_key(&transaction_id) { + return Ok(protocol::Response::Error("Transaction already exists".to_string())); + } + + let transaction = StmTransaction::new(transaction_id.clone()); + self.transactions.insert(transaction_id, transaction); + Ok(protocol::Response::Success(vec![])) + } + protocol::Command::CommitTransaction { transaction_id } => { + if let Some((_, transaction)) = self.transactions.remove(&transaction_id) { + match transaction.commit(self) { + Ok(_) => Ok(protocol::Response::Success(vec![])), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } else { + Ok(protocol::Response::Error("Transaction not found".to_string())) + } + } + protocol::Command::RollbackTransaction { transaction_id } => { + if let Some((_, transaction)) = self.transactions.remove(&transaction_id) { + match transaction.rollback() { + Ok(_) => Ok(protocol::Response::Success(vec![])), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } else { + Ok(protocol::Response::Error("Transaction not found".to_string())) + } + } + protocol::Command::CreateIndex { collection, index } => { + let coll = self.get_collection(&collection); + match coll.create_index(index) { + Ok(_) => Ok(protocol::Response::Success(vec![])), + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + protocol::Command::QueryByIndex { collection, index_name, value } => { + let coll = self.get_collection(&collection); + let value: Value = serde_json::from_slice(&value) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + + match coll.query_by_index(&index_name, &value) { + Ok(document_ids) => { + let result = serde_json::to_vec(&document_ids) + .map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?; + Ok(protocol::Response::Success(result)) + } + Err(e) => Ok(protocol::Response::Error(e.to_string())), + } + } + _ => { + Ok(protocol::Response::Error("Command not implemented".to_string())) + } + } + } + + /// Выполняет триггеры для указанной коллекции и события + /// В текущей реализации только логирует вызов триггеров + fn execute_triggers(&self, collection: &str, event: TriggerEvent, document: &[u8]) -> Result<()> { + if let Some(triggers) = self.triggers.get(collection) { + for trigger in triggers.iter() { + if trigger.event == event { + println!("Executing trigger '{}' for collection '{}' on {:?}", + trigger.name, collection, event); + } + } + } + Ok(()) + } + + /// Добавляет триггер в базу данных + /// Проверяет уникальность имени триггера в рамках коллекции + pub fn add_trigger(&self, trigger: Trigger) -> Result<()> { + // Проверяем уникальность имени триггера + if let Some(existing_triggers) = self.triggers.get(&trigger.collection) { + for existing in existing_triggers.iter() { + if existing.name == trigger.name { + return Err(crate::common::FutriixError::DatabaseError( + format!("Trigger '{}' already exists for collection '{}'", + trigger.name, trigger.collection) + )); + } + } + } + + // Добавляем триггер + self.triggers.entry(trigger.collection.clone()) + .or_insert_with(Vec::new) + .push(trigger); + + Ok(()) + } + + /// Возвращает статистику базы данных + /// Собирает информацию о количестве коллекций, документов, процедур и т.д. + #[allow(dead_code)] + pub fn get_stats(&self) -> Result> { + let mut stats = std::collections::HashMap::new(); + + stats.insert("collections".to_string(), self.collections.len()); + stats.insert("procedures".to_string(), self.procedures.len()); + stats.insert("active_transactions".to_string(), self.transactions.len()); + stats.insert("triggers".to_string(), self.triggers.len()); + + let mut total_documents = 0; + for entry in self.collections.iter() { + total_documents += entry.count_documents().unwrap_or(0); + } + + stats.insert("total_documents".to_string(), total_documents); + + Ok(stats) + } + + /// Создает бэкап базы данных + /// Возвращает хэш-таблицу коллекций, где каждая коллекция представлена + /// хэш-таблицей ID документа -> сериализованный документ + pub fn create_backup(&self) -> Result>>> { + let mut backup = std::collections::HashMap::new(); + + for entry in self.collections.iter() { + let name = entry.key().clone(); + let collection = entry.value(); + + let documents = collection.query_documents(vec![])?; + + let mut collection_backup = std::collections::HashMap::new(); + for document in documents { + if let Ok(doc_value) = serde_json::from_slice::(&document) { + if let Some(id) = doc_value.get("_id").and_then(|v| v.as_str()) { + collection_backup.insert(id.to_string(), document); + } + } + } + + backup.insert(name, collection_backup); + } + + Ok(backup) + } + + /// Восстанавливает базу данных из бэкапа + /// Очищает существующие коллекции и создает документы из бэкапа + pub fn restore_from_backup(&self, backup: std::collections::HashMap>>) -> Result<()> { + // Очищаем коллекции + self.collections.clear(); + + // Восстанавливаем документы из бэкапа + for (collection_name, documents) in backup { + let collection = self.get_collection(&collection_name); + + for (id, document) in documents { + let command = protocol::Command::Create { + collection: collection_name.clone(), + document, + }; + + if let Err(e) = self.execute_command(command) { + eprintln!("Failed to restore document {}: {}", id, e); + } + } + } + + Ok(()) + } +} diff --git a/src/server/sharding.rs b/src/server/sharding.rs new file mode 100644 index 0000000..4b3cbd6 --- /dev/null +++ b/src/server/sharding.rs @@ -0,0 +1,831 @@ +// src/server/sharding.rs +//! Lock-free модуль шардинга с консистентным хэшированием и Raft протоколом +//! +//! Основные компоненты: +//! 1. ShardingManager - управление распределением данных по узлам кластера +//! 2. RaftState - состояния узлов в Raft протоколе для консенсуса +//! 3. CollectionSharding - настройки шардинга для отдельных коллекций +//! 4. Lock-free репликация с консистентным хэшированием +//! +//! Особенности: +//! - Консистентное хэширование для равномерного распределения данных +//! - Raft протокол для выбора лидера и консенсуса +//! - Атомарные операции без блокировок +//! - Автоматическая ребалансировка кластера + +use std::collections::HashMap; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use tokio::sync::mpsc; +use tokio::time::{interval, Duration}; +use tokio::io::AsyncWriteExt; +use serde::{Serialize, Deserialize}; +use siphasher::sip::SipHasher13; +use crossbeam::queue::SegQueue; +use crossbeam::epoch::{self, Atomic, Owned, Guard}; +use dashmap::{DashMap, DashSet}; + +use crate::common::Result; +use crate::common::protocol; + +/// Состояния узла в Raft протоколе +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum RaftState { + Follower, + Candidate, + Leader, +} + +/// Atomic Raft состояние для атомарных операций +struct AtomicRaftState { + inner: AtomicU64, +} + +impl AtomicRaftState { + fn new() -> Self { + Self { + inner: AtomicU64::new(0), + } + } + + fn get(&self) -> RaftState { + match self.inner.load(Ordering::Acquire) { + 0 => RaftState::Follower, + 1 => RaftState::Candidate, + 2 => RaftState::Leader, + _ => RaftState::Follower, + } + } + + fn set(&self, state: RaftState) { + let value = match state { + RaftState::Follower => 0, + RaftState::Candidate => 1, + RaftState::Leader => 2, + }; + self.inner.store(value, Ordering::Release); + } + + fn compare_exchange(&self, current: RaftState, new: RaftState, order: Ordering) -> std::result::Result { + let current_val = match current { + RaftState::Follower => 0, + RaftState::Candidate => 1, + RaftState::Leader => 2, + }; + + let new_val = match new { + RaftState::Follower => 0, + RaftState::Candidate => 1, + RaftState::Leader => 2, + }; + + match self.inner.compare_exchange(current_val, new_val, order, Ordering::Relaxed) { + Ok(_) => Ok(new), + Err(actual_val) => { + let actual_state = match actual_val { + 0 => RaftState::Follower, + 1 => RaftState::Candidate, + 2 => RaftState::Leader, + _ => RaftState::Follower, + }; + Err(actual_state) + } + } + } +} + +/// Информация о Raft узле +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RaftNode { + pub node_id: String, + pub address: String, + pub state: RaftState, + pub term: u64, + pub voted_for: Option, + pub last_heartbeat: i64, +} + +/// Информация о шард-узле с Raft +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ShardNode { + pub node_id: String, + pub address: String, + pub capacity: u64, + pub used: u64, + pub collections: Vec, + pub raft_info: RaftNode, +} + +/// Состояние шардинга для коллекции +#[derive(Debug, Clone)] +pub struct CollectionSharding { + pub shard_key: String, + pub virtual_nodes: usize, + // Используем Arc для совместного доступа из нескольких потоков + pub ring: Arc>, +} + +/// События репликации +#[derive(Debug, Serialize, Deserialize)] +pub enum ReplicationEvent { + Command(protocol::Command), + SyncRequest, + Heartbeat, + RaftVoteRequest { term: u64, candidate_id: String }, + RaftVoteResponse { term: u64, vote_granted: bool }, + RaftAppendEntries { term: u64, leader_id: String }, +} + +/// Lock-Free очередь репликации на основе SegQueue +struct LockFreeReplicationQueue { + queue: SegQueue, + size: AtomicUsize, +} + +impl LockFreeReplicationQueue { + fn new() -> Self { + Self { + queue: SegQueue::new(), + size: AtomicUsize::new(0), + } + } + + fn push(&self, event: ReplicationEvent) { + self.queue.push(event); + self.size.fetch_add(1, Ordering::SeqCst); + } + + fn pop(&self) -> Option { + let event = self.queue.pop(); + if event.is_some() { + self.size.fetch_sub(1, Ordering::SeqCst); + } + event + } + + fn len(&self) -> usize { + self.size.load(Ordering::Acquire) + } +} + +/// Менеджер шардинга и репликации с Raft +#[derive(Clone)] +pub struct ShardingManager { + // Шардинг компоненты + nodes: Arc>, + // Используем DashMap для атомарного доступа к настройкам шардинга коллекций + collections: Arc>, + virtual_nodes_per_node: usize, + min_nodes_for_cluster: usize, + + // Raft компоненты с atomic операциями + current_term: Arc, + voted_for: Arc>, + is_leader: Arc, + raft_state: Arc, + cluster_formed: Arc, + + // Репликация компоненты + replication_queue: Arc, + sequence_number: Arc, + replication_enabled: Arc, + node_id: String, +} + +impl ShardingManager { + pub fn new( + virtual_nodes_per_node: usize, + replication_enabled: bool, + min_nodes_for_cluster: usize, + node_id: String + ) -> Self { + // Создаем менеджер с начальными настройками + let manager = Self { + nodes: Arc::new(DashMap::new()), + collections: Arc::new(DashMap::new()), + virtual_nodes_per_node, + min_nodes_for_cluster, + current_term: Arc::new(AtomicU64::new(0)), + voted_for: Arc::new(DashMap::new()), + is_leader: Arc::new(AtomicBool::new(false)), + raft_state: Arc::new(AtomicRaftState::new()), + cluster_formed: Arc::new(AtomicBool::new(false)), + replication_queue: Arc::new(LockFreeReplicationQueue::new()), + sequence_number: Arc::new(AtomicU64::new(0)), + replication_enabled: Arc::new(AtomicBool::new(replication_enabled)), + node_id, + }; + + // Добавляем текущий узел в кластер + let _ = manager.add_node( + manager.node_id.clone(), + "127.0.0.1:8081".to_string(), + 1024 * 1024 * 1024 + ); + + // Запускаем фоновый цикл репликации + let manager_clone = manager.clone(); + tokio::spawn(async move { + manager_clone.run_replication_loop().await; + }); + + manager + } + + async fn run_replication_loop(self) { + let mut heartbeat_interval = interval(Duration::from_millis(1000)); + let mut election_timeout = interval(Duration::from_millis(5000)); + + loop { + tokio::select! { + _ = heartbeat_interval.tick() => { + if self.is_leader.load(Ordering::SeqCst) && + self.replication_enabled.load(Ordering::SeqCst) && + self.cluster_formed.load(Ordering::SeqCst) { + let _ = self.send_heartbeat().await; + } + } + _ = election_timeout.tick() => { + if !self.is_leader.load(Ordering::SeqCst) && + self.replication_enabled.load(Ordering::SeqCst) && + self.cluster_formed.load(Ordering::SeqCst) { + let _ = self.start_election(); + } + } + _ = tokio::time::sleep(Duration::from_millis(10)) => { + while let Some(event) = self.replication_queue.pop() { + self.handle_replication_event(event).await; + } + } + } + } + } + + async fn handle_replication_event(&self, event: ReplicationEvent) { + if !self.replication_enabled.load(Ordering::SeqCst) { + return; + } + + match event { + ReplicationEvent::Command(cmd) => { + self.replicate_command(cmd).await; + } + ReplicationEvent::SyncRequest => { + self.sync_with_nodes().await; + } + ReplicationEvent::Heartbeat => { + let _ = self.send_heartbeat().await; + } + ReplicationEvent::RaftVoteRequest { term, candidate_id } => { + self.handle_vote_request(term, candidate_id).await; + } + ReplicationEvent::RaftVoteResponse { term, vote_granted } => { + self.handle_vote_response(term, vote_granted).await; + } + ReplicationEvent::RaftAppendEntries { term, leader_id } => { + self.handle_append_entries(term, leader_id).await; + } + } + } + + async fn replicate_command(&self, command: protocol::Command) { + if !self.cluster_formed.load(Ordering::SeqCst) { + return; + } + + let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst); + + let nodes: Vec = self.nodes.iter() + .map(|entry| entry.value().clone()) + .collect(); + + for node in nodes { + if self.is_leader.load(Ordering::SeqCst) && node.raft_info.node_id == self.node_id { + continue; + } + + let node_addr = node.address.clone(); + let cmd_clone = command.clone(); + let seq_clone = sequence; + + tokio::spawn(async move { + if let Err(e) = Self::send_command_to_node(&node_addr, &cmd_clone, seq_clone).await { + eprintln!("Failed to replicate to {}: {}", node_addr, e); + } + }); + } + } + + async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> { + let mut stream = match tokio::net::TcpStream::connect(node).await { + Ok(stream) => stream, + Err(e) => { + eprintln!("Failed to connect to {}: {}", node, e); + return Ok(()); + } + }; + + let message = protocol::ReplicationMessage { + sequence, + command: command.clone(), + timestamp: chrono::Utc::now().timestamp(), + }; + + let bytes = protocol::serialize(&message)?; + + if let Err(e) = stream.write_all(&bytes).await { + eprintln!("Failed to send command to {}: {}", node, e); + } + + Ok(()) + } + + async fn sync_with_nodes(&self) { + if !self.cluster_formed.load(Ordering::SeqCst) { + return; + } + + let node_count = self.nodes.len(); + println!("Starting sync with {} nodes", node_count); + + let nodes: Vec = self.nodes.iter() + .map(|entry| entry.value().address.clone()) + .collect(); + + for node_addr in nodes { + tokio::spawn(async move { + if let Err(e) = Self::sync_with_node(&node_addr).await { + eprintln!("Failed to sync with {}: {}", node_addr, e); + } + }); + } + } + + async fn sync_with_node(_node: &str) -> Result<()> { + // Заглушка для синхронизации + Ok(()) + } + + async fn send_heartbeat(&self) -> Result<()> { + if !self.cluster_formed.load(Ordering::SeqCst) { + return Ok(()); + } + + let nodes: Vec = self.nodes.iter() + .map(|entry| entry.value().clone()) + .collect(); + + for node in nodes { + if node.raft_info.state == RaftState::Follower && node.raft_info.node_id != self.node_id { + let node_addr = node.address.clone(); + tokio::spawn(async move { + if let Err(e) = Self::send_heartbeat_to_node(&node_addr).await { + eprintln!("Heartbeat failed for {}: {}", node_addr, e); + } + }); + } + } + Ok(()) + } + + async fn send_heartbeat_to_node(node: &str) -> Result<()> { + let mut stream = match tokio::net::TcpStream::connect(node).await { + Ok(stream) => stream, + Err(e) => { + eprintln!("Failed to connect to {} for heartbeat: {}", node, e); + return Ok(()); + } + }; + + let heartbeat = protocol::ReplicationMessage { + sequence: 0, + command: protocol::Command::CallProcedure { name: "heartbeat".to_string() }, + timestamp: chrono::Utc::now().timestamp(), + }; + + let bytes = protocol::serialize(&heartbeat)?; + + if let Err(e) = stream.write_all(&bytes).await { + eprintln!("Failed to send heartbeat to {}: {}", node, e); + } + + Ok(()) + } + + async fn handle_vote_request(&self, term: u64, candidate_id: String) { + let current_term = self.current_term.load(Ordering::SeqCst); + + if term > current_term { + self.current_term.store(term, Ordering::SeqCst); + self.voted_for.insert(term, candidate_id); + } + } + + async fn handle_vote_response(&self, term: u64, vote_granted: bool) { + if vote_granted && term == self.current_term.load(Ordering::SeqCst) { + let node_count = self.nodes.len(); + + if node_count >= self.min_nodes_for_cluster { + match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Leader, Ordering::SeqCst) { + Ok(_) => { + self.is_leader.store(true, Ordering::SeqCst); + println!("Elected as leader for term {}", term); + } + Err(_) => {} + } + } + } + } + + async fn handle_append_entries(&self, term: u64, leader_id: String) { + let current_term = self.current_term.load(Ordering::SeqCst); + + if term >= current_term { + self.current_term.store(term, Ordering::SeqCst); + + match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Follower, Ordering::SeqCst) { + Ok(_) => { + self.is_leader.store(false, Ordering::SeqCst); + + if let Some(mut node) = self.nodes.get_mut(&self.node_id) { + node.raft_info.state = RaftState::Follower; + node.raft_info.term = term; + node.raft_info.last_heartbeat = chrono::Utc::now().timestamp(); + } + } + Err(_) => {} + } + } + } + + pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> { + let raft_node = RaftNode { + node_id: node_id.clone(), + address: address.clone(), + state: RaftState::Follower, + term: 0, + voted_for: None, + last_heartbeat: chrono::Utc::now().timestamp(), + }; + + let node = ShardNode { + node_id: node_id.clone(), + address, + capacity, + used: 0, + collections: Vec::new(), + raft_info: raft_node, + }; + + self.nodes.insert(node_id, node); + + let node_count = self.nodes.len(); + if node_count >= self.min_nodes_for_cluster { + self.cluster_formed.store(true, Ordering::SeqCst); + println!("Cluster formed with {} nodes (minimum required: {})", + node_count, self.min_nodes_for_cluster); + } + + Ok(()) + } + + pub fn remove_node(&self, node_id: &str) -> Result<()> { + self.nodes.remove(node_id); + + let node_count = self.nodes.len(); + if node_count < self.min_nodes_for_cluster { + self.cluster_formed.store(false, Ordering::SeqCst); + self.is_leader.store(false, Ordering::SeqCst); + println!("Cluster no longer formed. Have {} nodes (need {})", + node_count, self.min_nodes_for_cluster); + } + + Ok(()) + } + + pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> { + if !self.cluster_formed.load(Ordering::SeqCst) { + return Err(crate::common::FutriixError::ShardingError( + format!("Cannot setup sharding: cluster not formed. Need at least {} nodes.", + self.min_nodes_for_cluster) + )); + } + + let sharding = CollectionSharding { + shard_key: shard_key.to_string(), + virtual_nodes: self.virtual_nodes_per_node, + ring: Arc::new(DashMap::new()), + }; + + self.collections.insert(collection.to_string(), sharding); + + self.rebuild_ring(collection)?; + Ok(()) + } + + fn rebuild_ring(&self, collection: &str) -> Result<()> { + if let Some(mut entry) = self.collections.get_mut(collection) { + let sharding = entry.value_mut(); + + // Очищаем ring + sharding.ring.clear(); + + let nodes: Vec = self.nodes.iter() + .map(|node_entry| node_entry.key().clone()) + .collect(); + + for node_id in nodes { + for i in 0..sharding.virtual_nodes { + let key = format!("{}-{}", node_id, i); + let hash = self.hash_key(&key); + sharding.ring.insert(hash, node_id.clone()); + } + } + } + + Ok(()) + } + + fn hash_key(&self, key: &str) -> u64 { + let mut hasher = SipHasher13::new(); + key.hash(&mut hasher); + hasher.finish() + } + + pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result> { + if !self.cluster_formed.load(Ordering::SeqCst) { + return Err(crate::common::FutriixError::ShardingError( + format!("Cannot find node: cluster not formed. Need at least {} nodes.", + self.min_nodes_for_cluster) + )); + } + + if let Some(sharding) = self.collections.get(collection) { + let key_hash = self.hash_key(key_value); + + // Ищем ближайший узел в ConcurrentHashMap + // Собираем все записи в вектор + let mut entries: Vec<(u64, String)> = sharding.ring.iter() + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(); + + // Сортируем по хэшу + entries.sort_by_key(|&(hash, _)| hash); + + // Находим первый узел с хэшем >= key_hash + for (hash, node_id) in &entries { + if *hash >= key_hash { + return Ok(Some(node_id.clone())); + } + } + + // Если не нашли, возвращаем первый узел + // Используем итерацию по срезу, чтобы не перемещать вектор + if let Some((_, node_id)) = entries.first() { + return Ok(Some(node_id.clone())); + } + } + + Ok(None) + } + + pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> { + if !self.cluster_formed.load(Ordering::SeqCst) { + return Err(crate::common::FutriixError::ShardingError( + format!("Cannot migrate shard: cluster not formed. Need at least {} nodes.", + self.min_nodes_for_cluster) + )); + } + + if !self.nodes.contains_key(from_node) { + return Err(crate::common::FutriixError::ShardingError( + format!("Source node '{}' not found in cluster", from_node) + )); + } + + if !self.nodes.contains_key(to_node) { + return Err(crate::common::FutriixError::ShardingError( + format!("Destination node '{}' not found in cluster", to_node) + )); + } + + println!("Migrating shard for collection '{}' from {} to {} with key {}", + collection, from_node, to_node, shard_key); + + self.rebuild_ring(collection)?; + Ok(()) + } + + pub fn rebalance_cluster(&self) -> Result<()> { + if !self.cluster_formed.load(Ordering::SeqCst) { + return Err(crate::common::FutriixError::ShardingError( + format!("Cannot rebalance cluster: cluster not formed. Need at least {} nodes.", + self.min_nodes_for_cluster) + )); + } + + let node_count = self.nodes.len(); + println!("Rebalancing cluster with {} nodes", node_count); + + // Перестраиваем все rings + for key in self.collections.iter().map(|entry| entry.key().clone()).collect::>() { + self.rebuild_ring(&key)?; + } + + self.rebalance_nodes()?; + + Ok(()) + } + + fn rebalance_nodes(&self) -> Result<()> { + println!("Rebalancing nodes in cluster..."); + + let mut total_capacity = 0; + let mut total_used = 0; + let mut nodes_info = Vec::new(); + + for node in self.nodes.iter() { + total_capacity += node.capacity; + total_used += node.used; + nodes_info.push((node.node_id.clone(), node.used, node.capacity)); + } + + let avg_usage = if total_capacity > 0 { total_used as f64 / total_capacity as f64 } else { 0.0 }; + + println!("Cluster usage: {:.2}% ({} / {})", avg_usage * 100.0, total_used, total_capacity); + + let mut overloaded_nodes = Vec::new(); + let mut underloaded_nodes = Vec::new(); + + for (node_id, used, capacity) in nodes_info { + let usage = if capacity > 0 { used as f64 / capacity as f64 } else { 0.0 }; + + if usage > avg_usage * 1.2 { + overloaded_nodes.push((node_id, usage)); + } else if usage < avg_usage * 0.8 { + underloaded_nodes.push((node_id, usage)); + } + } + + println!("Overloaded nodes: {}", overloaded_nodes.len()); + println!("Underloaded nodes: {}", underloaded_nodes.len()); + + Ok(()) + } + + pub fn get_cluster_status(&self) -> Result { + let mut cluster_nodes = Vec::new(); + let mut total_capacity = 0; + let mut total_used = 0; + let mut raft_nodes = Vec::new(); + + for node in self.nodes.iter() { + total_capacity += node.capacity; + total_used += node.used; + + cluster_nodes.push(protocol::ShardInfo { + node_id: node.node_id.clone(), + address: node.address.clone(), + capacity: node.capacity, + used: node.used, + collections: node.collections.clone(), + }); + + raft_nodes.push(protocol::RaftNodeInfo { + node_id: node.node_id.clone(), + address: node.address.clone(), + state: match node.raft_info.state { + RaftState::Leader => "leader".to_string(), + RaftState::Follower => "follower".to_string(), + RaftState::Candidate => "candidate".to_string(), + }, + term: node.raft_info.term, + last_heartbeat: node.raft_info.last_heartbeat, + }); + } + + let rebalance_needed = { + if total_capacity == 0 { + false + } else { + let avg_usage = total_used as f64 / total_capacity as f64; + let mut needs_rebalance = false; + + for node in self.nodes.iter() { + let usage = if node.capacity > 0 { + node.used as f64 / node.capacity as f64 + } else { + 0.0 + }; + + if usage > avg_usage * 1.2 || usage < avg_usage * 0.8 { + needs_rebalance = true; + break; + } + } + + needs_rebalance + } + }; + + Ok(protocol::ClusterStatus { + nodes: cluster_nodes, + total_capacity, + total_used, + rebalance_needed, + cluster_formed: self.cluster_formed.load(Ordering::SeqCst), + leader_exists: self.is_leader.load(Ordering::SeqCst), + raft_nodes, + }) + } + + pub fn get_raft_nodes(&self) -> Vec { + self.nodes.iter() + .map(|node| node.raft_info.clone()) + .collect() + } + + pub fn is_cluster_formed(&self) -> bool { + self.cluster_formed.load(Ordering::SeqCst) + } + + pub fn start_election(&self) -> Result<()> { + if !self.cluster_formed.load(Ordering::SeqCst) { + return Err(crate::common::FutriixError::ShardingError( + format!("Cluster not formed. Need at least {} nodes.", self.min_nodes_for_cluster) + )); + } + + let new_term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1; + println!("Starting election for term {}", new_term); + + self.is_leader.store(false, Ordering::SeqCst); + + match self.raft_state.compare_exchange(RaftState::Follower, RaftState::Candidate, Ordering::SeqCst) { + Ok(_) => { + if let Some(mut node) = self.nodes.get_mut(&self.node_id) { + node.raft_info.state = RaftState::Candidate; + node.raft_info.term = new_term; + node.raft_info.voted_for = Some(self.node_id.clone()); + } + + self.replication_queue.push(ReplicationEvent::RaftVoteRequest { + term: new_term, + candidate_id: self.node_id.clone(), + }); + } + Err(current_state) => { + println!("Already in state {:?}, cannot start election", current_state); + } + } + + Ok(()) + } + + pub async fn replicate(&self, command: protocol::Command) -> Result<()> { + if !self.replication_enabled.load(Ordering::SeqCst) { + return Ok(()); + } + + if !self.cluster_formed.load(Ordering::SeqCst) { + return Err(crate::common::FutriixError::ShardingError( + "Cannot replicate: cluster not formed".to_string() + )); + } + + self.replication_queue.push(ReplicationEvent::Command(command)); + Ok(()) + } + + pub async fn request_sync(&self) -> Result<()> { + if !self.replication_enabled.load(Ordering::SeqCst) { + return Ok(()); + } + + self.replication_queue.push(ReplicationEvent::SyncRequest); + Ok(()) + } + + pub fn get_nodes(&self) -> Vec { + self.nodes.iter() + .map(|node| node.clone()) + .collect() + } + + pub fn get_sequence_number(&self) -> u64 { + self.sequence_number.load(Ordering::SeqCst) + } + + pub fn is_replication_enabled(&self) -> bool { + self.replication_enabled.load(Ordering::SeqCst) + } + + pub fn get_node(&self, node_id: &str) -> Option { + self.nodes.get(node_id).map(|entry| entry.clone()) + } + + pub fn get_node_id(&self) -> &str { + &self.node_id + } +}