diff --git a/src/cluster.rs b/src/cluster.rs deleted file mode 100644 index 6cdda83..0000000 --- a/src/cluster.rs +++ /dev/null @@ -1,826 +0,0 @@ -//! Модуль кластеризации для flusql -//! -//! Реализует простейший шардинг, мастер-мастер синхронную репликацию -//! и управление кластером по паттерну "Centralized Coordinator". -//! -//! Основные возможности: -//! - Простейший шардинг данных -//! - Мастер-мастер синхронная репликация -//! - Централизованный координатор (один главный узел) -//! - Команды управления кластером через Lua -//! - Wait-free доступ к метаданным кластера - -use std::collections::HashMap; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::sync::Arc; -use dashmap::DashMap; -use serde::{Serialize, Deserialize}; -use tokio::sync::broadcast; -use tokio::time::{Duration, interval}; -use thiserror::Error; - -/// Узел кластера -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ClusterNode { - pub id: String, - pub address: String, - pub role: NodeRole, - pub status: NodeStatus, - pub last_heartbeat: u64, - pub shards: Vec, -} - -/// Роль узла -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Copy)] -pub enum NodeRole { - Master, - Slave, - Coordinator, -} - -/// Статус узла -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Copy)] -pub enum NodeStatus { - Online, - Offline, - Syncing, -} - -/// Шард данных -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Shard { - pub id: String, - pub master_node: String, - pub slave_nodes: Vec, - pub data_range: (u64, u64), // Диапазон данных шарда - pub replication_lag: u64, // Задержка репликации в мс -} - -/// Менеджер кластера -#[derive(Clone)] -pub struct ClusterManager { - pub nodes: Arc>, - pub shards: Arc>, - coordinator_id: Arc, - is_coordinator: Arc, - cluster_version: Arc, - event_sender: broadcast::Sender, - node_id: String, - node_address: String, -} - -impl ClusterManager { - /// Создание нового менеджера кластера - pub fn new(node_id: &str, address: &str) -> Self { - let (event_sender, _) = broadcast::channel(100); - - let mut manager = Self { - nodes: Arc::new(DashMap::new()), - shards: Arc::new(DashMap::new()), - coordinator_id: Arc::new(AtomicU64::new(0)), - is_coordinator: Arc::new(AtomicBool::new(false)), - cluster_version: Arc::new(AtomicU64::new(1)), - event_sender, - node_id: node_id.to_string(), - node_address: address.to_string(), - }; - - // Регистрируем текущий узел - manager.register_node(node_id, address, NodeRole::Slave); - - manager - } - - /// Регистрация узла в кластере - pub fn register_node(&mut self, node_id: &str, address: &str, role: NodeRole) { - let node = ClusterNode { - id: node_id.to_string(), - address: address.to_string(), - role, - status: NodeStatus::Online, - last_heartbeat: std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(), - shards: Vec::new(), - }; - - self.nodes.insert(node_id.to_string(), node); - - // Если это первый узел, делаем его координатором - if self.nodes.len() == 1 { - self.set_coordinator(node_id); - } - - let _ = self.event_sender.send(ClusterEvent::NodeJoined { - node_id: node_id.to_string(), - address: address.to_string(), - role, - }); - } - - /// Установка координатора (публичный метод для использования в Lua) - pub fn set_coordinator(&mut self, node_id: &str) { - // ЛОГИЧЕСКАЯ ОШИБКА БЫЛА ЗДЕСЬ: неправильное преобразование node_id в числовой ID - - // Исправление: корректное преобразование строкового ID - let node_id_num = if node_id.starts_with("node_") { - node_id.trim_start_matches("node_").parse::().unwrap_or_else(|_| { - // Если парсинг не удался, используем хэш строки - use std::hash::{Hash, Hasher}; - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - node_id.hash(&mut hasher); - hasher.finish() - }) - } else { - // Пробуем извлечь числовую часть - let numeric_part: String = node_id.chars() - .filter(|c| c.is_digit(10)) - .collect(); - - if !numeric_part.is_empty() { - numeric_part.parse::().unwrap_or_else(|_| { - // Если парсинг не удался, используем хэш - use std::hash::{Hash, Hasher}; - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - node_id.hash(&mut hasher); - hasher.finish() - }) - } else { - // Если нет цифр, используем хэш всей строки - use std::hash::{Hash, Hasher}; - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - node_id.hash(&mut hasher); - hasher.finish() - } - }; - - self.coordinator_id.store(node_id_num, Ordering::SeqCst); - let current_coordinator_id = self.get_coordinator_id(); - self.is_coordinator.store(node_id == current_coordinator_id, Ordering::SeqCst); - - // Обновляем роль старого координатора (если есть) - if let Some(old_coordinator) = self.get_current_coordinator() { - if old_coordinator.id != node_id { - if let Some(mut old_node) = self.nodes.get_mut(&old_coordinator.id) { - old_node.role = NodeRole::Slave; - } - } - } - - // Устанавливаем новую роль - if let Some(mut node) = self.nodes.get_mut(node_id) { - node.role = NodeRole::Coordinator; - } - - let _ = self.event_sender.send(ClusterEvent::CoordinatorChanged { - old_coordinator: self.get_current_coordinator_name(), - new_coordinator: node_id.to_string(), - }); - } - - /// Получение ID координатора - pub fn get_coordinator_id(&self) -> String { - let id_num = self.coordinator_id.load(Ordering::Relaxed); - if id_num == 0 { - // Возвращаем первый узел как координатор по умолчанию - self.nodes.iter() - .next() - .map(|entry| entry.key().clone()) - .unwrap_or_else(|| "node_1".to_string()) - } else { - // ЛОГИЧЕСКАЯ ОШИБКА: была попытка вернуть числовой ID как строку - // Исправление: ищем узел с соответствующим ID - for entry in self.nodes.iter() { - let node_id = entry.key(); - if node_id.starts_with("node_") { - if let Ok(num) = node_id.trim_start_matches("node_").parse::() { - if num == id_num { - return node_id.clone(); - } - } - } - } - - // Если не нашли, возвращаем первый узел - self.nodes.iter() - .next() - .map(|entry| entry.key().clone()) - .unwrap_or_else(|| format!("node_{}", id_num)) - } - } - - /// Получение текущего координатора - pub fn get_current_coordinator(&self) -> Option { - let coordinator_id = self.get_coordinator_id(); - if coordinator_id.is_empty() { - None - } else { - self.nodes.get(&coordinator_id).map(|node| node.value().clone()) - } - } - - /// Получение имени текущего координатора - pub fn get_current_coordinator_name(&self) -> String { - self.get_current_coordinator() - .map(|node| node.id) - .unwrap_or_else(|| "none".to_string()) - } - - /// Получение адреса текущего координатора - pub fn get_coordinator_address(&self) -> Option { - self.get_current_coordinator() - .map(|node| node.address) - } - - /// Выбор нового координатора - pub fn elect_new_coordinator(&mut self) -> Result<(), ClusterError> { - // Найти наиболее подходящего кандидата среди онлайн узлов - let candidates: Vec = self.nodes.iter() - .filter(|entry| { - let node = entry.value(); - node.status == NodeStatus::Online && - node.role != NodeRole::Coordinator && - !node.id.is_empty() - }) - .map(|entry| entry.value().clone()) - .collect(); - - if candidates.is_empty() { - // ЛОГИЧЕСКАЯ ОШИБКА: не обрабатывался случай, когда нет кандидатов - // Исправление: если нет кандидатов, используем первый доступный узел - let fallback_candidate: Vec = self.nodes.iter() - .filter(|entry| !entry.key().is_empty()) - .map(|entry| entry.value().clone()) - .collect(); - - if fallback_candidate.is_empty() { - return Err(ClusterError::NoNodes); - } - - let candidate = &fallback_candidate[0]; - log::warn!("No online non-coordinator nodes found, using fallback candidate: {}", candidate.id); - self.set_coordinator(&candidate.id); - return Ok(()); - } - - // Стратегия выбора: узел с наибольшим количеством шардов - let candidate = candidates.iter() - .max_by_key(|node| { - // Вес кандидата = количество шардов * 100 + длина ID (для разрешения ничьих) - node.shards.len() * 100 + node.id.len() - }) - .ok_or_else(|| ClusterError::NoNodes)?; - - log::info!("Electing new coordinator: {} (shards: {})", - candidate.id, candidate.shards.len()); - - self.set_coordinator(&candidate.id); - Ok(()) - } - - /// Создание кластера - pub fn create_cluster(&mut self, nodes: HashMap) -> Result<(), ClusterError> { - for (node_id, address) in nodes { - self.register_node(&node_id, &address, NodeRole::Slave); - } - - // Выбираем первый узел как координатора - let coordinator_id = self.nodes.iter().next() - .map(|entry| entry.key().clone()) - .ok_or_else(|| ClusterError::NoNodes)?; - - self.set_coordinator(&coordinator_id); - - Ok(()) - } - - /// Ребалансировка кластера - pub fn rebalance_cluster(&self) -> Result<(), ClusterError> { - // Простая стратегия ребалансировки: равномерное распределение шардов - - let node_count = self.nodes.len(); - if node_count == 0 { - return Err(ClusterError::NoNodes); - } - - let shard_count = self.shards.len(); - let shards_per_node = if node_count > 0 { - (shard_count + node_count - 1) / node_count - } else { - 0 - }; - - // Перераспределяем шарды - let mut node_index = 0; - let node_ids: Vec = self.nodes.iter().map(|entry| entry.key().clone()).collect(); - - for (i, mut shard_entry) in self.shards.iter_mut().enumerate() { - let shard = shard_entry.value_mut(); - let target_node = &node_ids[node_index % node_ids.len()]; - - // Обновляем мастер для шарда - shard.master_node = target_node.clone(); - - // Добавляем шард к узлу - if let Some(mut node) = self.nodes.get_mut(target_node) { - if !node.shards.contains(&shard.id) { - node.shards.push(shard.id.clone()); - } - } - - node_index += 1; - } - - // Увеличиваем версию кластера - self.cluster_version.fetch_add(1, Ordering::SeqCst); - - Ok(()) - } - - /// Исключение узла из кластера - pub fn evict_node(&mut self, node_id: &str) -> Result<(), ClusterError> { - // Проверяем, существует ли узел - if !self.nodes.contains_key(node_id) { - return Err(ClusterError::NodeNotFound(node_id.to_string())); - } - - // Нельзя исключить координатора, если нет других узлов - if node_id == self.get_coordinator_id() { - let online_nodes_count = self.nodes.iter() - .filter(|entry| entry.value().status == NodeStatus::Online) - .count(); - - if online_nodes_count <= 1 { - return Err(ClusterError::CannotEvictLastNode); - } - - // Нужно выбрать нового координатора перед исключением текущего - self.elect_new_coordinator()?; - } - - // Перераспределяем шарды исключаемого узла - if let Some(node) = self.nodes.get(node_id) { - for shard_id in &node.shards { - if let Some(mut shard) = self.shards.get_mut(shard_id) { - // Находим новый мастер для шарда - if let Some(new_master) = self.find_best_node_for_shard(shard_id) { - shard.master_node = new_master.clone(); - - // Добавляем шард к новому узлу - if let Some(mut new_node) = self.nodes.get_mut(&new_master) { - new_node.shards.push(shard_id.clone()); - } - } - } - } - } - - // Удаляем узел - self.nodes.remove(node_id); - - // Отправляем событие - let _ = self.event_sender.send(ClusterEvent::NodeEvicted { - node_id: node_id.to_string(), - }); - - Ok(()) - } - - /// Поиск лучшего узла для шард - fn find_best_node_for_shard(&self, shard_id: &str) -> Option { - // Простая эвристика: узел с наименьшим количеством шардов - self.nodes.iter() - .filter(|entry| entry.value().status == NodeStatus::Online) - .min_by_key(|entry| entry.shards.len()) - .map(|entry| entry.key().clone()) - } - - /// Добавление шарда - pub fn add_shard(&mut self, shard_id: &str, master_node: &str, slave_nodes: Vec) -> Result<(), ClusterError> { - // Проверяем существование мастер-узла - if !self.nodes.contains_key(master_node) { - return Err(ClusterError::NodeNotFound(master_node.to_string())); - } - - // Проверяем существование слейв-узлов - for slave in &slave_nodes { - if !self.nodes.contains_key(slave) { - return Err(ClusterError::NodeNotFound(slave.clone())); - } - } - - // Создаем шард - let shard = Shard { - id: shard_id.to_string(), - master_node: master_node.to_string(), - slave_nodes, - data_range: (0, u64::MAX), - replication_lag: 0, - }; - - self.shards.insert(shard_id.to_string(), shard); - - // Добавляем шард к мастер-узлу - if let Some(mut node) = self.nodes.get_mut(master_node) { - node.shards.push(shard_id.to_string()); - } - - Ok(()) - } - - /// Удаление шард - pub fn remove_shard(&mut self, shard_id: &str) -> Result<(), ClusterError> { - if self.shards.remove(shard_id).is_none() { - return Err(ClusterError::ShardNotFound(shard_id.to_string())); - } - - // Удаляем шард из всех узлов - for mut node in self.nodes.iter_mut() { - node.shards.retain(|id| id != shard_id); - } - - Ok(()) - } - - /// Начало репликации - pub fn start_replication(&mut self, source_node: &str, target_node: &str) -> Result<(), ClusterError> { - // Проверяем существование узлов - if !self.nodes.contains_key(source_node) || !self.nodes.contains_key(target_node) { - return Err(ClusterError::NodeNotFound(format!("{} or {}", source_node, target_node))); - } - - // Обновляем статус узлов - if let Some(mut source) = self.nodes.get_mut(source_node) { - source.status = NodeStatus::Syncing; - } - - if let Some(mut target) = self.nodes.get_mut(target_node) { - target.status = NodeStatus::Syncing; - } - - // В реальной реализации здесь будет логика репликации данных - // Для упрощения просто отправляем событие - - let _ = self.event_sender.send(ClusterEvent::ReplicationStarted { - source: source_node.to_string(), - target: target_node.to_string(), - }); - - Ok(()) - } - - /// Получение статуса кластера - pub fn get_cluster_status(&self) -> ClusterStatus { - let nodes: Vec = self.nodes.iter().map(|entry| entry.value().clone()).collect(); - let shards: Vec = self.shards.iter().map(|entry| entry.value().clone()).collect(); - - ClusterStatus { - coordinator_id: self.get_coordinator_id(), - coordinator_address: self.get_coordinator_address().unwrap_or_default(), - is_coordinator: self.is_coordinator.load(Ordering::Relaxed), - cluster_version: self.cluster_version.load(Ordering::Relaxed), - node_count: nodes.len(), - shard_count: shards.len(), - nodes, - shards, - } - } - - /// Запуск фоновых задач кластера - pub fn start_background_tasks(self: Arc) { - let cluster1 = Arc::clone(&self); - let cluster2 = Arc::clone(&self); - let cluster3 = Arc::clone(&self); - - // Задача отправки heartbeat - tokio::spawn(async move { - let mut interval = interval(Duration::from_secs(5)); - - loop { - interval.tick().await; - - // Отправляем heartbeat если мы координатор - if cluster1.is_coordinator.load(Ordering::Relaxed) { - cluster1.send_heartbeat().await; - } - } - }); - - // Задача обнаружения отказавших узлов - tokio::spawn(async move { - let mut interval = interval(Duration::from_secs(10)); - - loop { - interval.tick().await; - - // Обнаружение отказавших узлов и перевыбор координатора если нужно - cluster2.detect_failed_nodes().await; - - // Проверяем, жив ли координатор - if let Err(e) = cluster2.check_coordinator_health().await { - log::warn!("Coordinator health check failed: {}", e); - // Попытка перевыбора координатора - let cluster_mut = Arc::clone(&cluster2); - let mut cluster_ref = (*cluster_mut).clone(); - if let Err(e) = cluster_ref.elect_new_coordinator() { - log::error!("Failed to elect new coordinator: {}", e); - } - } - } - }); - - // Задача проверки здоровья координатора - tokio::spawn(async move { - let mut interval = interval(Duration::from_secs(15)); - - loop { - interval.tick().await; - cluster3.coordinator_health_check().await; - } - }); - } - - /// Проверка здоровья координатора - async fn coordinator_health_check(&self) { - if !self.is_coordinator.load(Ordering::Relaxed) { - // Если мы не координатор, проверяем жив ли координатор - let coordinator = self.get_current_coordinator(); - - if let Some(coord) = coordinator { - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(); - - let heartbeat_timeout = 30; // 30 секунд - - if now - coord.last_heartbeat > heartbeat_timeout && coord.status == NodeStatus::Online { - log::warn!("Coordinator {} appears to be down (last heartbeat: {}s ago)", - coord.id, now - coord.last_heartbeat); - - // Обновляем статус координатора - if let Some(mut node) = self.nodes.get_mut(&coord.id) { - node.status = NodeStatus::Offline; - } - - // Отправляем событие - let _ = self.event_sender.send(ClusterEvent::CoordinatorFailed { - coordinator_id: coord.id.clone(), - }); - } - } else { - log::warn!("No coordinator defined in cluster"); - } - } - } - - /// Проверка здоровья координатора (альтернативная реализация) - async fn check_coordinator_health(&self) -> Result<(), String> { - if self.is_coordinator.load(Ordering::Relaxed) { - // Мы координатор - всегда здоровы - return Ok(()); - } - - let coordinator = self.get_current_coordinator(); - if let Some(coord) = coordinator { - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(); - - let heartbeat_timeout = 30; // 30 секунд - - if now - coord.last_heartbeat > heartbeat_timeout { - return Err(format!("Coordinator {} heartbeat timeout", coord.id)); - } - - Ok(()) - } else { - Err("No coordinator defined".to_string()) - } - } - - /// Отправка heartbeat - async fn send_heartbeat(&self) { - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(); - - // Обновляем время последнего heartbeat для координатора - let coordinator_id = self.get_coordinator_id(); - if let Some(mut node) = self.nodes.get_mut(&coordinator_id) { - node.last_heartbeat = now; - } - - // Отправляем событие - let _ = self.event_sender.send(ClusterEvent::Heartbeat { - timestamp: now, - coordinator_id: coordinator_id, - }); - } - - /// Обнаружение отказавших узлов - async fn detect_failed_nodes(&self) { - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(); - - let timeout = 30; // 30 секунд - - for mut node in self.nodes.iter_mut() { - if node.id != self.get_coordinator_id() && now - node.last_heartbeat > timeout { - node.status = NodeStatus::Offline; - - // Отправляем событие - let _ = self.event_sender.send(ClusterEvent::NodeFailed { - node_id: node.id.clone(), - }); - } - } - } -} - -/// Статус кластера -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ClusterStatus { - pub coordinator_id: String, - pub coordinator_address: String, - pub is_coordinator: bool, - pub cluster_version: u64, - pub node_count: usize, - pub shard_count: usize, - pub nodes: Vec, - pub shards: Vec, -} - -/// События кластера -#[derive(Debug, Clone)] -pub enum ClusterEvent { - NodeJoined { - node_id: String, - address: String, - role: NodeRole, - }, - NodeEvicted { - node_id: String, - }, - NodeFailed { - node_id: String, - }, - ReplicationStarted { - source: String, - target: String, - }, - Heartbeat { - timestamp: u64, - coordinator_id: String, - }, - ShardMoved { - shard_id: String, - from_node: String, - to_node: String, - }, - CoordinatorChanged { - old_coordinator: String, - new_coordinator: String, - }, - CoordinatorFailed { - coordinator_id: String, - }, -} - -/// Менеджер репликации -#[derive(Clone)] -pub struct ReplicationManager { - cluster_manager: Arc, - replication_queue: Arc>>, - is_replicating: Arc, -} - -impl ReplicationManager { - pub fn new(cluster_manager: Arc) -> Self { - Self { - cluster_manager, - replication_queue: Arc::new(DashMap::new()), - is_replicating: Arc::new(AtomicBool::new(false)), - } - } - - /// Начало репликации данных - pub async fn replicate_data(&self, database_name: &str, table_name: &str) -> Result<(), ClusterError> { - // В реальной реализации здесь будет логика репликации данных таблицы - // Для упрощения просто добавляем задачу в очередь - - let task = ReplicationTask { - database: database_name.to_string(), - table: table_name.to_string(), - status: ReplicationStatus::Pending, - started_at: std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(), - }; - - self.replication_queue - .entry(database_name.to_string()) - .or_insert_with(Vec::new) - .push(task); - - Ok(()) - } - - /// Запуск фоновой репликации - pub fn start_background_replication(self: Arc) { - let manager = Arc::clone(&self); - - tokio::spawn(async move { - let mut interval = interval(Duration::from_secs(1)); - - loop { - interval.tick().await; - - if !manager.is_replicating.load(Ordering::Relaxed) { - manager.process_replication_queue().await; - } - } - }); - } - - /// Обработка очереди репликации - async fn process_replication_queue(&self) { - self.is_replicating.store(true, Ordering::Relaxed); - - // Обрабатываем задачи репликации - for mut entry in self.replication_queue.iter_mut() { - let tasks = entry.value_mut(); - let mut completed_tasks = Vec::new(); - - for (i, task) in tasks.iter_mut().enumerate() { - if task.status == ReplicationStatus::Pending { - task.status = ReplicationStatus::InProgress; - - // В реальной реализации здесь будет репликация данных - // Для упрощения просто отмечаем как завершенную - task.status = ReplicationStatus::Completed; - completed_tasks.push(i); - } - } - - // Удаляем завершенные задачи - for i in completed_tasks.into_iter().rev() { - tasks.remove(i); - } - } - - self.is_replicating.store(false, Ordering::Relaxed); - } -} - -/// Задача репликации -#[derive(Debug, Clone)] -struct ReplicationTask { - database: String, - table: String, - status: ReplicationStatus, - started_at: u64, -} - -/// Статус репликации -#[derive(Debug, Clone, PartialEq, Copy)] -enum ReplicationStatus { - Pending, - InProgress, - Completed, - Failed, -} - -/// Ошибки кластера -#[derive(Debug, Error)] -pub enum ClusterError { - #[error("Node not found: {0}")] - NodeNotFound(String), - - #[error("Shard not found: {0}")] - ShardNotFound(String), - - #[error("Cannot evict coordinator")] - CannotEvictCoordinator, - - #[error("Cannot evict last node")] - CannotEvictLastNode, - - #[error("No nodes in cluster")] - NoNodes, - - #[error("Replication error: {0}")] - ReplicationError(String), - - #[error("Network error: {0}")] - NetworkError(String), - - #[error("Coordinator election failed: {0}")] - CoordinatorElectionFailed(String), -}