//! Модуль кластеризации для flusql //! //! Реализует простейший шардинг, мастер-мастер синхронную репликацию //! и управление кластером по паттерну "Centralized Coordinator". //! //! Основные возможности: //! - Простейший шардинг данных //! - Мастер-мастер синхронная репликация //! - Централизованный координатор (один главный узел) //! - Команды управления кластером через Lua //! - Wait-free доступ к метаданным кластера use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use dashmap::DashMap; use serde::{Serialize, Deserialize}; use tokio::sync::broadcast; use tokio::time::{Duration, interval}; use thiserror::Error; /// Узел кластера #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ClusterNode { pub id: String, pub address: String, pub role: NodeRole, pub status: NodeStatus, pub last_heartbeat: u64, pub shards: Vec, } /// Роль узла #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Copy)] pub enum NodeRole { Master, Slave, Coordinator, } /// Статус узла #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Copy)] pub enum NodeStatus { Online, Offline, Syncing, } /// Шард данных #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Shard { pub id: String, pub master_node: String, pub slave_nodes: Vec, pub data_range: (u64, u64), // Диапазон данных шарда pub replication_lag: u64, // Задержка репликации в мс } /// Менеджер кластера #[derive(Clone)] pub struct ClusterManager { pub nodes: Arc>, pub shards: Arc>, coordinator_id: Arc, is_coordinator: Arc, cluster_version: Arc, event_sender: broadcast::Sender, node_id: String, node_address: String, } impl ClusterManager { /// Создание нового менеджера кластера pub fn new(node_id: &str, address: &str) -> Self { let (event_sender, _) = broadcast::channel(100); let mut manager = Self { nodes: Arc::new(DashMap::new()), shards: Arc::new(DashMap::new()), coordinator_id: Arc::new(AtomicU64::new(0)), is_coordinator: Arc::new(AtomicBool::new(false)), cluster_version: Arc::new(AtomicU64::new(1)), event_sender, node_id: node_id.to_string(), node_address: address.to_string(), }; // Регистрируем текущий узел manager.register_node(node_id, address, NodeRole::Slave); manager } /// Регистрация узла в кластере pub fn register_node(&mut self, node_id: &str, address: &str, role: NodeRole) { let node = ClusterNode { id: node_id.to_string(), address: address.to_string(), role, status: NodeStatus::Online, last_heartbeat: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(), shards: Vec::new(), }; self.nodes.insert(node_id.to_string(), node); // Если это первый узел, делаем его координатором if self.nodes.len() == 1 { self.set_coordinator(node_id); } let _ = self.event_sender.send(ClusterEvent::NodeJoined { node_id: node_id.to_string(), address: address.to_string(), role, }); } /// Установка координатора (публичный метод для использования в Lua) pub fn set_coordinator(&mut self, node_id: &str) { // Исправление: храним строковый ID координатора, а не числовой // Преобразуем строку в хэш для внутреннего использования let coordinator_hash = self.hash_node_id(node_id); self.coordinator_id.store(coordinator_hash, Ordering::SeqCst); // Обновляем флаг is_coordinator для текущего узла let is_current_node_coordinator = node_id == self.node_id; self.is_coordinator.store(is_current_node_coordinator, Ordering::SeqCst); // Обновляем роль старого координатора (если есть) if let Some(old_coordinator) = self.get_current_coordinator() { if old_coordinator.id != node_id { if let Some(mut old_node) = self.nodes.get_mut(&old_coordinator.id) { old_node.role = NodeRole::Slave; } } } // Устанавливаем новую роль if let Some(mut node) = self.nodes.get_mut(node_id) { node.role = NodeRole::Coordinator; } let _ = self.event_sender.send(ClusterEvent::CoordinatorChanged { old_coordinator: self.get_current_coordinator_name(), new_coordinator: node_id.to_string(), }); } /// Хэширование ID узла для внутреннего использования fn hash_node_id(&self, node_id: &str) -> u64 { use std::hash::{Hash, Hasher}; let mut hasher = std::collections::hash_map::DefaultHasher::new(); node_id.hash(&mut hasher); hasher.finish() } /// Получение ID координатора (строковый идентификатор) pub fn get_coordinator_id(&self) -> String { // Возвращаем имя узла-координатора self.get_current_coordinator_name() } /// Получение текущего координатора pub fn get_current_coordinator(&self) -> Option { // Ищем узел с ролью Coordinator for entry in self.nodes.iter() { let node = entry.value(); if node.role == NodeRole::Coordinator { return Some(node.clone()); } } // Если не нашли координатора, выбираем первый доступный узел self.nodes.iter() .next() .map(|entry| entry.value().clone()) } /// Получение имени текущего координатора pub fn get_current_coordinator_name(&self) -> String { self.get_current_coordinator() .map(|node| node.id) .unwrap_or_else(|| "none".to_string()) } /// Получение адреса текущего координатора pub fn get_coordinator_address(&self) -> Option { self.get_current_coordinator() .map(|node| node.address) } /// Выбор нового координатора pub fn elect_new_coordinator(&mut self) -> Result<(), ClusterError> { // Найти наиболее подходящего кандидата среди онлайн узлов let candidates: Vec = self.nodes.iter() .filter(|entry| { let node = entry.value(); node.status == NodeStatus::Online && node.role != NodeRole::Coordinator && !node.id.is_empty() }) .map(|entry| entry.value().clone()) .collect(); if candidates.is_empty() { // Исправление: обрабатываем случай, когда нет кандидатов // Ищем любой онлайн узел, включая текущего координатора let fallback_candidate: Vec = self.nodes.iter() .filter(|entry| { let node = entry.value(); node.status == NodeStatus::Online && !node.id.is_empty() }) .map(|entry| entry.value().clone()) .collect(); if fallback_candidate.is_empty() { return Err(ClusterError::NoNodes); } let candidate = &fallback_candidate[0]; log::warn!("No online non-coordinator nodes found, using fallback candidate: {}", candidate.id); self.set_coordinator(&candidate.id); return Ok(()); } // Стратегия выбора: узел с наибольшим количеством шардов let candidate = candidates.iter() .max_by_key(|node| { // Вес кандидата = количество шардов * 100 + длина ID (для разрешения ничьих) node.shards.len() * 100 + node.id.len() }) .ok_or_else(|| ClusterError::NoNodes)?; log::info!("Electing new coordinator: {} (shards: {})", candidate.id, candidate.shards.len()); self.set_coordinator(&candidate.id); Ok(()) } /// Создание кластера pub fn create_cluster(&mut self, nodes: HashMap) -> Result<(), ClusterError> { for (node_id, address) in nodes { self.register_node(&node_id, &address, NodeRole::Slave); } // Выбираем первый узел как координатора let coordinator_id = self.nodes.iter().next() .map(|entry| entry.key().clone()) .ok_or_else(|| ClusterError::NoNodes)?; self.set_coordinator(&coordinator_id); Ok(()) } /// Ребалансировка кластера pub fn rebalance_cluster(&self) -> Result<(), ClusterError> { // Простая стратегия ребалансировки: равномерное распределение шардов let node_count = self.nodes.len(); if node_count == 0 { return Err(ClusterError::NoNodes); } let shard_count = self.shards.len(); let shards_per_node = if node_count > 0 { (shard_count + node_count - 1) / node_count } else { 0 }; // Перераспределяем шарды let mut node_index = 0; let node_ids: Vec = self.nodes.iter().map(|entry| entry.key().clone()).collect(); for (i, mut shard_entry) in self.shards.iter_mut().enumerate() { let shard = shard_entry.value_mut(); let target_node = &node_ids[node_index % node_ids.len()]; // Обновляем мастер для шарда shard.master_node = target_node.clone(); // Добавляем шард к узлу if let Some(mut node) = self.nodes.get_mut(target_node) { if !node.shards.contains(&shard.id) { node.shards.push(shard.id.clone()); } } node_index += 1; } // Увеличиваем версию кластера self.cluster_version.fetch_add(1, Ordering::SeqCst); Ok(()) } /// Исключение узла из кластера pub fn evict_node(&mut self, node_id: &str) -> Result<(), ClusterError> { // Проверяем, существует ли узел if !self.nodes.contains_key(node_id) { return Err(ClusterError::NodeNotFound(node_id.to_string())); } // Нельзя исключить координатора, если нет других узлов if node_id == self.get_coordinator_id() { let online_nodes_count = self.nodes.iter() .filter(|entry| entry.value().status == NodeStatus::Online) .count(); if online_nodes_count <= 1 { return Err(ClusterError::CannotEvictLastNode); } // Нужно выбрать нового координатора перед исключением текущего self.elect_new_coordinator()?; } // Перераспределяем шарды исключаемого узла if let Some(node) = self.nodes.get(node_id) { for shard_id in &node.shards { if let Some(mut shard) = self.shards.get_mut(shard_id) { // Находим новый мастер для шарда if let Some(new_master) = self.find_best_node_for_shard(shard_id) { shard.master_node = new_master.clone(); // Добавляем шард к новому узлу if let Some(mut new_node) = self.nodes.get_mut(&new_master) { new_node.shards.push(shard_id.clone()); } } } } } // Удаляем узел self.nodes.remove(node_id); // Отправляем событие let _ = self.event_sender.send(ClusterEvent::NodeEvicted { node_id: node_id.to_string(), }); Ok(()) } /// Поиск лучшего узла для шард fn find_best_node_for_shard(&self, shard_id: &str) -> Option { // Простая эвристика: узел с наименьшим количеством шардов self.nodes.iter() .filter(|entry| entry.value().status == NodeStatus::Online) .min_by_key(|entry| entry.shards.len()) .map(|entry| entry.key().clone()) } /// Добавление шарда pub fn add_shard(&mut self, shard_id: &str, master_node: &str, slave_nodes: Vec) -> Result<(), ClusterError> { // Проверяем существование мастер-узла if !self.nodes.contains_key(master_node) { return Err(ClusterError::NodeNotFound(master_node.to_string())); } // Проверяем существование слейв-узлов for slave in &slave_nodes { if !self.nodes.contains_key(slave) { return Err(ClusterError::NodeNotFound(slave.clone())); } } // Создаем шард let shard = Shard { id: shard_id.to_string(), master_node: master_node.to_string(), slave_nodes, data_range: (0, u64::MAX), replication_lag: 0, }; self.shards.insert(shard_id.to_string(), shard); // Добавляем шард к мастер-узлу if let Some(mut node) = self.nodes.get_mut(master_node) { node.shards.push(shard_id.to_string()); } Ok(()) } /// Удаление шард pub fn remove_shard(&mut self, shard_id: &str) -> Result<(), ClusterError> { if self.shards.remove(shard_id).is_none() { return Err(ClusterError::ShardNotFound(shard_id.to_string())); } // Удаляем шард из всех узлов for mut node in self.nodes.iter_mut() { node.shards.retain(|id| id != shard_id); } Ok(()) } /// Начало репликации pub fn start_replication(&mut self, source_node: &str, target_node: &str) -> Result<(), ClusterError> { // Проверяем существование узлов if !self.nodes.contains_key(source_node) || !self.nodes.contains_key(target_node) { return Err(ClusterError::NodeNotFound(format!("{} or {}", source_node, target_node))); } // Обновляем статус узлов if let Some(mut source) = self.nodes.get_mut(source_node) { source.status = NodeStatus::Syncing; } if let Some(mut target) = self.nodes.get_mut(target_node) { target.status = NodeStatus::Syncing; } // В реальной реализации здесь будет логика репликации данных // Для упрощения просто отправляем событие let _ = self.event_sender.send(ClusterEvent::ReplicationStarted { source: source_node.to_string(), target: target_node.to_string(), }); Ok(()) } /// Получение статуса кластера pub fn get_cluster_status(&self) -> ClusterStatus { let nodes: Vec = self.nodes.iter().map(|entry| entry.value().clone()).collect(); let shards: Vec = self.shards.iter().map(|entry| entry.value().clone()).collect(); ClusterStatus { coordinator_id: self.get_coordinator_id(), coordinator_address: self.get_coordinator_address().unwrap_or_default(), is_coordinator: self.is_coordinator.load(Ordering::Relaxed), cluster_version: self.cluster_version.load(Ordering::Relaxed), node_count: nodes.len(), shard_count: shards.len(), nodes, shards, } } /// Запуск фоновых задач кластера pub fn start_background_tasks(self: Arc) { let cluster1 = Arc::clone(&self); let cluster2 = Arc::clone(&self); let cluster3 = Arc::clone(&self); // Задача отправки heartbeat tokio::spawn(async move { let mut interval = interval(Duration::from_secs(5)); loop { interval.tick().await; // Отправляем heartbeat если мы координатор if cluster1.is_coordinator.load(Ordering::Relaxed) { cluster1.send_heartbeat().await; } } }); // Задача обнаружения отказавших узлов tokio::spawn(async move { let mut interval = interval(Duration::from_secs(10)); loop { interval.tick().await; // Обнаружение отказавших узлов и перевыбор координатора если нужно cluster2.detect_failed_nodes().await; // Проверяем, жив ли координатор if let Err(e) = cluster2.check_coordinator_health().await { log::warn!("Coordinator health check failed: {}", e); // Попытка перевыбора координатора let cluster_mut = Arc::clone(&cluster2); let mut cluster_ref = (*cluster_mut).clone(); if let Err(e) = cluster_ref.elect_new_coordinator() { log::error!("Failed to elect new coordinator: {}", e); } } } }); // Задача проверки здоровья координатора tokio::spawn(async move { let mut interval = interval(Duration::from_secs(15)); loop { interval.tick().await; cluster3.coordinator_health_check().await; } }); } /// Проверка здоровья координатора async fn coordinator_health_check(&self) { if !self.is_coordinator.load(Ordering::Relaxed) { // Если мы не координатор, проверяем жив ли координатор let coordinator = self.get_current_coordinator(); if let Some(coord) = coordinator { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); let heartbeat_timeout = 30; // 30 секунд if now - coord.last_heartbeat > heartbeat_timeout && coord.status == NodeStatus::Online { log::warn!("Coordinator {} appears to be down (last heartbeat: {}s ago)", coord.id, now - coord.last_heartbeat); // Обновляем статус координатора if let Some(mut node) = self.nodes.get_mut(&coord.id) { node.status = NodeStatus::Offline; } // Отправляем событие let _ = self.event_sender.send(ClusterEvent::CoordinatorFailed { coordinator_id: coord.id.clone(), }); } } else { log::warn!("No coordinator defined in cluster"); } } } /// Проверка здоровья координатора (альтернативная реализация) async fn check_coordinator_health(&self) -> Result<(), String> { if self.is_coordinator.load(Ordering::Relaxed) { // Мы координатор - всегда здоровы return Ok(()); } let coordinator = self.get_current_coordinator(); if let Some(coord) = coordinator { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); let heartbeat_timeout = 30; // 30 секунд if now - coord.last_heartbeat > heartbeat_timeout { return Err(format!("Coordinator {} heartbeat timeout", coord.id)); } Ok(()) } else { Err("No coordinator defined".to_string()) } } /// Отправка heartbeat async fn send_heartbeat(&self) { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); // Обновляем время последнего heartbeat для координатора let coordinator_id = self.get_coordinator_id(); if let Some(mut node) = self.nodes.get_mut(&coordinator_id) { node.last_heartbeat = now; } // Отправляем событие let _ = self.event_sender.send(ClusterEvent::Heartbeat { timestamp: now, coordinator_id: coordinator_id, }); } /// Обнаружение отказавших узлов async fn detect_failed_nodes(&self) { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); let timeout = 30; // 30 секунд for mut node in self.nodes.iter_mut() { if node.id != self.get_coordinator_id() && now - node.last_heartbeat > timeout { node.status = NodeStatus::Offline; // Отправляем событие let _ = self.event_sender.send(ClusterEvent::NodeFailed { node_id: node.id.clone(), }); } } } } /// Статус кластера #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ClusterStatus { pub coordinator_id: String, pub coordinator_address: String, pub is_coordinator: bool, pub cluster_version: u64, pub node_count: usize, pub shard_count: usize, pub nodes: Vec, pub shards: Vec, } /// События кластера #[derive(Debug, Clone)] pub enum ClusterEvent { NodeJoined { node_id: String, address: String, role: NodeRole, }, NodeEvicted { node_id: String, }, NodeFailed { node_id: String, }, ReplicationStarted { source: String, target: String, }, Heartbeat { timestamp: u64, coordinator_id: String, }, ShardMoved { shard_id: String, from_node: String, to_node: String, }, CoordinatorChanged { old_coordinator: String, new_coordinator: String, }, CoordinatorFailed { coordinator_id: String, }, } /// Менеджер репликации #[derive(Clone)] pub struct ReplicationManager { cluster_manager: Arc, replication_queue: Arc>>, is_replicating: Arc, } impl ReplicationManager { pub fn new(cluster_manager: Arc) -> Self { Self { cluster_manager, replication_queue: Arc::new(DashMap::new()), is_replicating: Arc::new(AtomicBool::new(false)), } } /// Начало репликации данных pub async fn replicate_data(&self, database_name: &str, table_name: &str) -> Result<(), ClusterError> { // В реальной реализации здесь будет логика репликации данных таблицы // Для упрощения просто добавляем задачу в очередь let task = ReplicationTask { database: database_name.to_string(), table: table_name.to_string(), status: ReplicationStatus::Pending, started_at: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(), }; self.replication_queue .entry(database_name.to_string()) .or_insert_with(Vec::new) .push(task); Ok(()) } /// Запуск фоновой репликации pub fn start_background_replication(self: Arc) { let manager = Arc::clone(&self); tokio::spawn(async move { let mut interval = interval(Duration::from_secs(1)); loop { interval.tick().await; if !manager.is_replicating.load(Ordering::Relaxed) { manager.process_replication_queue().await; } } }); } /// Обработка очереди репликации async fn process_replication_queue(&self) { self.is_replicating.store(true, Ordering::Relaxed); // Обрабатываем задачи репликации for mut entry in self.replication_queue.iter_mut() { let tasks = entry.value_mut(); let mut completed_tasks = Vec::new(); for (i, task) in tasks.iter_mut().enumerate() { if task.status == ReplicationStatus::Pending { task.status = ReplicationStatus::InProgress; // В реальной реализации здесь будет репликация данных // Для упрощения просто отмечаем как завершенную task.status = ReplicationStatus::Completed; completed_tasks.push(i); } } // Удаляем завершенные задачи for i in completed_tasks.into_iter().rev() { tasks.remove(i); } } self.is_replicating.store(false, Ordering::Relaxed); } } /// Задача репликации #[derive(Debug, Clone)] struct ReplicationTask { database: String, table: String, status: ReplicationStatus, started_at: u64, } /// Статус репликации #[derive(Debug, Clone, PartialEq, Copy)] enum ReplicationStatus { Pending, InProgress, Completed, Failed, } /// Ошибки кластера #[derive(Debug, Error)] pub enum ClusterError { #[error("Node not found: {0}")] NodeNotFound(String), #[error("Shard not found: {0}")] ShardNotFound(String), #[error("Cannot evict coordinator")] CannotEvictCoordinator, #[error("Cannot evict last node")] CannotEvictLastNode, #[error("No nodes in cluster")] NoNodes, #[error("Replication error: {0}")] ReplicationError(String), #[error("Network error: {0}")] NetworkError(String), #[error("Coordinator election failed: {0}")] CoordinatorElectionFailed(String), }