From 7aea95abe964b1e0df61f6639836a042b7977a34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Wed, 14 Jan 2026 20:57:57 +0000 Subject: [PATCH] Upload files to "src" --- src/cluster.rs | 789 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 789 insertions(+) create mode 100644 src/cluster.rs diff --git a/src/cluster.rs b/src/cluster.rs new file mode 100644 index 0000000..6a20953 --- /dev/null +++ b/src/cluster.rs @@ -0,0 +1,789 @@ +//! Модуль кластеризации для flusql +//! +//! Реализует простейший шардинг, мастер-мастер синхронную репликацию +//! и управление кластером по паттерну "Centralized Coordinator". +//! +//! Основные возможности: +//! - Простейший шардинг данных +//! - Мастер-мастер синхронная репликация +//! - Централизованный координатор (один главный узел) +//! - Команды управления кластером через Lua +//! - Wait-free доступ к метаданным кластера + +use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; +use dashmap::DashMap; +use serde::{Serialize, Deserialize}; +use tokio::sync::broadcast; +use tokio::time::{Duration, interval}; +use thiserror::Error; + +/// Узел кластера +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ClusterNode { + pub id: String, + pub address: String, + pub role: NodeRole, + pub status: NodeStatus, + pub last_heartbeat: u64, + pub shards: Vec, +} + +/// Роль узла +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Copy)] +pub enum NodeRole { + Master, + Slave, + Coordinator, +} + +/// Статус узла +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Copy)] +pub enum NodeStatus { + Online, + Offline, + Syncing, +} + +/// Шард данных +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Shard { + pub id: String, + pub master_node: String, + pub slave_nodes: Vec, + pub data_range: (u64, u64), // Диапазон данных шарда + pub replication_lag: u64, // Задержка репликации в мс +} + +/// Менеджер кластера +#[derive(Clone)] +pub struct ClusterManager { + pub nodes: Arc>, + pub shards: Arc>, + coordinator_id: Arc, + is_coordinator: Arc, + cluster_version: Arc, + event_sender: broadcast::Sender, + node_id: String, + node_address: String, +} + +impl ClusterManager { + /// Создание нового менеджера кластера + pub fn new(node_id: &str, address: &str) -> Self { + let (event_sender, _) = broadcast::channel(100); + + let mut manager = Self { + nodes: Arc::new(DashMap::new()), + shards: Arc::new(DashMap::new()), + coordinator_id: Arc::new(AtomicU64::new(0)), + is_coordinator: Arc::new(AtomicBool::new(false)), + cluster_version: Arc::new(AtomicU64::new(1)), + event_sender, + node_id: node_id.to_string(), + node_address: address.to_string(), + }; + + // Регистрируем текущий узел + manager.register_node(node_id, address, NodeRole::Slave); + + manager + } + + /// Регистрация узла в кластере + pub fn register_node(&mut self, node_id: &str, address: &str, role: NodeRole) { + let node = ClusterNode { + id: node_id.to_string(), + address: address.to_string(), + role, + status: NodeStatus::Online, + last_heartbeat: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + shards: Vec::new(), + }; + + self.nodes.insert(node_id.to_string(), node); + + // Если это первый узел, делаем его координатором + if self.nodes.len() == 1 { + self.set_coordinator(node_id); + } + + let _ = self.event_sender.send(ClusterEvent::NodeJoined { + node_id: node_id.to_string(), + address: address.to_string(), + role, + }); + } + + /// Установка координатора (публичный метод для использования в Lua) + pub fn set_coordinator(&mut self, node_id: &str) { + // Исправление: храним строковый ID координатора, а не числовой + // Преобразуем строку в хэш для внутреннего использования + let coordinator_hash = self.hash_node_id(node_id); + self.coordinator_id.store(coordinator_hash, Ordering::SeqCst); + + // Обновляем флаг is_coordinator для текущего узла + let is_current_node_coordinator = node_id == self.node_id; + self.is_coordinator.store(is_current_node_coordinator, Ordering::SeqCst); + + // Обновляем роль старого координатора (если есть) + if let Some(old_coordinator) = self.get_current_coordinator() { + if old_coordinator.id != node_id { + if let Some(mut old_node) = self.nodes.get_mut(&old_coordinator.id) { + old_node.role = NodeRole::Slave; + } + } + } + + // Устанавливаем новую роль + if let Some(mut node) = self.nodes.get_mut(node_id) { + node.role = NodeRole::Coordinator; + } + + let _ = self.event_sender.send(ClusterEvent::CoordinatorChanged { + old_coordinator: self.get_current_coordinator_name(), + new_coordinator: node_id.to_string(), + }); + } + + /// Хэширование ID узла для внутреннего использования + fn hash_node_id(&self, node_id: &str) -> u64 { + use std::hash::{Hash, Hasher}; + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + node_id.hash(&mut hasher); + hasher.finish() + } + + /// Получение ID координатора (строковый идентификатор) + pub fn get_coordinator_id(&self) -> String { + // Возвращаем имя узла-координатора + self.get_current_coordinator_name() + } + + /// Получение текущего координатора + pub fn get_current_coordinator(&self) -> Option { + // Ищем узел с ролью Coordinator + for entry in self.nodes.iter() { + let node = entry.value(); + if node.role == NodeRole::Coordinator { + return Some(node.clone()); + } + } + + // Если не нашли координатора, выбираем первый доступный узел + self.nodes.iter() + .next() + .map(|entry| entry.value().clone()) + } + + /// Получение имени текущего координатора + pub fn get_current_coordinator_name(&self) -> String { + self.get_current_coordinator() + .map(|node| node.id) + .unwrap_or_else(|| "none".to_string()) + } + + /// Получение адреса текущего координатора + pub fn get_coordinator_address(&self) -> Option { + self.get_current_coordinator() + .map(|node| node.address) + } + + /// Выбор нового координатора + pub fn elect_new_coordinator(&mut self) -> Result<(), ClusterError> { + // Найти наиболее подходящего кандидата среди онлайн узлов + let candidates: Vec = self.nodes.iter() + .filter(|entry| { + let node = entry.value(); + node.status == NodeStatus::Online && + node.role != NodeRole::Coordinator && + !node.id.is_empty() + }) + .map(|entry| entry.value().clone()) + .collect(); + + if candidates.is_empty() { + // Исправление: обрабатываем случай, когда нет кандидатов + // Ищем любой онлайн узел, включая текущего координатора + let fallback_candidate: Vec = self.nodes.iter() + .filter(|entry| { + let node = entry.value(); + node.status == NodeStatus::Online && !node.id.is_empty() + }) + .map(|entry| entry.value().clone()) + .collect(); + + if fallback_candidate.is_empty() { + return Err(ClusterError::NoNodes); + } + + let candidate = &fallback_candidate[0]; + log::warn!("No online non-coordinator nodes found, using fallback candidate: {}", candidate.id); + self.set_coordinator(&candidate.id); + return Ok(()); + } + + // Стратегия выбора: узел с наибольшим количеством шардов + let candidate = candidates.iter() + .max_by_key(|node| { + // Вес кандидата = количество шардов * 100 + длина ID (для разрешения ничьих) + node.shards.len() * 100 + node.id.len() + }) + .ok_or_else(|| ClusterError::NoNodes)?; + + log::info!("Electing new coordinator: {} (shards: {})", + candidate.id, candidate.shards.len()); + + self.set_coordinator(&candidate.id); + Ok(()) + } + + /// Создание кластера + pub fn create_cluster(&mut self, nodes: HashMap) -> Result<(), ClusterError> { + for (node_id, address) in nodes { + self.register_node(&node_id, &address, NodeRole::Slave); + } + + // Выбираем первый узел как координатора + let coordinator_id = self.nodes.iter().next() + .map(|entry| entry.key().clone()) + .ok_or_else(|| ClusterError::NoNodes)?; + + self.set_coordinator(&coordinator_id); + + Ok(()) + } + + /// Ребалансировка кластера + pub fn rebalance_cluster(&self) -> Result<(), ClusterError> { + // Простая стратегия ребалансировки: равномерное распределение шардов + + let node_count = self.nodes.len(); + if node_count == 0 { + return Err(ClusterError::NoNodes); + } + + let shard_count = self.shards.len(); + let shards_per_node = if node_count > 0 { + (shard_count + node_count - 1) / node_count + } else { + 0 + }; + + // Перераспределяем шарды + let mut node_index = 0; + let node_ids: Vec = self.nodes.iter().map(|entry| entry.key().clone()).collect(); + + for (i, mut shard_entry) in self.shards.iter_mut().enumerate() { + let shard = shard_entry.value_mut(); + let target_node = &node_ids[node_index % node_ids.len()]; + + // Обновляем мастер для шарда + shard.master_node = target_node.clone(); + + // Добавляем шард к узлу + if let Some(mut node) = self.nodes.get_mut(target_node) { + if !node.shards.contains(&shard.id) { + node.shards.push(shard.id.clone()); + } + } + + node_index += 1; + } + + // Увеличиваем версию кластера + self.cluster_version.fetch_add(1, Ordering::SeqCst); + + Ok(()) + } + + /// Исключение узла из кластера + pub fn evict_node(&mut self, node_id: &str) -> Result<(), ClusterError> { + // Проверяем, существует ли узел + if !self.nodes.contains_key(node_id) { + return Err(ClusterError::NodeNotFound(node_id.to_string())); + } + + // Нельзя исключить координатора, если нет других узлов + if node_id == self.get_coordinator_id() { + let online_nodes_count = self.nodes.iter() + .filter(|entry| entry.value().status == NodeStatus::Online) + .count(); + + if online_nodes_count <= 1 { + return Err(ClusterError::CannotEvictLastNode); + } + + // Нужно выбрать нового координатора перед исключением текущего + self.elect_new_coordinator()?; + } + + // Перераспределяем шарды исключаемого узла + if let Some(node) = self.nodes.get(node_id) { + for shard_id in &node.shards { + if let Some(mut shard) = self.shards.get_mut(shard_id) { + // Находим новый мастер для шарда + if let Some(new_master) = self.find_best_node_for_shard(shard_id) { + shard.master_node = new_master.clone(); + + // Добавляем шард к новому узлу + if let Some(mut new_node) = self.nodes.get_mut(&new_master) { + new_node.shards.push(shard_id.clone()); + } + } + } + } + } + + // Удаляем узел + self.nodes.remove(node_id); + + // Отправляем событие + let _ = self.event_sender.send(ClusterEvent::NodeEvicted { + node_id: node_id.to_string(), + }); + + Ok(()) + } + + /// Поиск лучшего узла для шард + fn find_best_node_for_shard(&self, shard_id: &str) -> Option { + // Простая эвристика: узел с наименьшим количеством шардов + self.nodes.iter() + .filter(|entry| entry.value().status == NodeStatus::Online) + .min_by_key(|entry| entry.shards.len()) + .map(|entry| entry.key().clone()) + } + + /// Добавление шарда + pub fn add_shard(&mut self, shard_id: &str, master_node: &str, slave_nodes: Vec) -> Result<(), ClusterError> { + // Проверяем существование мастер-узла + if !self.nodes.contains_key(master_node) { + return Err(ClusterError::NodeNotFound(master_node.to_string())); + } + + // Проверяем существование слейв-узлов + for slave in &slave_nodes { + if !self.nodes.contains_key(slave) { + return Err(ClusterError::NodeNotFound(slave.clone())); + } + } + + // Создаем шард + let shard = Shard { + id: shard_id.to_string(), + master_node: master_node.to_string(), + slave_nodes, + data_range: (0, u64::MAX), + replication_lag: 0, + }; + + self.shards.insert(shard_id.to_string(), shard); + + // Добавляем шард к мастер-узлу + if let Some(mut node) = self.nodes.get_mut(master_node) { + node.shards.push(shard_id.to_string()); + } + + Ok(()) + } + + /// Удаление шард + pub fn remove_shard(&mut self, shard_id: &str) -> Result<(), ClusterError> { + if self.shards.remove(shard_id).is_none() { + return Err(ClusterError::ShardNotFound(shard_id.to_string())); + } + + // Удаляем шард из всех узлов + for mut node in self.nodes.iter_mut() { + node.shards.retain(|id| id != shard_id); + } + + Ok(()) + } + + /// Начало репликации + pub fn start_replication(&mut self, source_node: &str, target_node: &str) -> Result<(), ClusterError> { + // Проверяем существование узлов + if !self.nodes.contains_key(source_node) || !self.nodes.contains_key(target_node) { + return Err(ClusterError::NodeNotFound(format!("{} or {}", source_node, target_node))); + } + + // Обновляем статус узлов + if let Some(mut source) = self.nodes.get_mut(source_node) { + source.status = NodeStatus::Syncing; + } + + if let Some(mut target) = self.nodes.get_mut(target_node) { + target.status = NodeStatus::Syncing; + } + + // В реальной реализации здесь будет логика репликации данных + // Для упрощения просто отправляем событие + + let _ = self.event_sender.send(ClusterEvent::ReplicationStarted { + source: source_node.to_string(), + target: target_node.to_string(), + }); + + Ok(()) + } + + /// Получение статуса кластера + pub fn get_cluster_status(&self) -> ClusterStatus { + let nodes: Vec = self.nodes.iter().map(|entry| entry.value().clone()).collect(); + let shards: Vec = self.shards.iter().map(|entry| entry.value().clone()).collect(); + + ClusterStatus { + coordinator_id: self.get_coordinator_id(), + coordinator_address: self.get_coordinator_address().unwrap_or_default(), + is_coordinator: self.is_coordinator.load(Ordering::Relaxed), + cluster_version: self.cluster_version.load(Ordering::Relaxed), + node_count: nodes.len(), + shard_count: shards.len(), + nodes, + shards, + } + } + + /// Запуск фоновых задач кластера + pub fn start_background_tasks(self: Arc) { + let cluster1 = Arc::clone(&self); + let cluster2 = Arc::clone(&self); + let cluster3 = Arc::clone(&self); + + // Задача отправки heartbeat + tokio::spawn(async move { + let mut interval = interval(Duration::from_secs(5)); + + loop { + interval.tick().await; + + // Отправляем heartbeat если мы координатор + if cluster1.is_coordinator.load(Ordering::Relaxed) { + cluster1.send_heartbeat().await; + } + } + }); + + // Задача обнаружения отказавших узлов + tokio::spawn(async move { + let mut interval = interval(Duration::from_secs(10)); + + loop { + interval.tick().await; + + // Обнаружение отказавших узлов и перевыбор координатора если нужно + cluster2.detect_failed_nodes().await; + + // Проверяем, жив ли координатор + if let Err(e) = cluster2.check_coordinator_health().await { + log::warn!("Coordinator health check failed: {}", e); + // Попытка перевыбора координатора + let cluster_mut = Arc::clone(&cluster2); + let mut cluster_ref = (*cluster_mut).clone(); + if let Err(e) = cluster_ref.elect_new_coordinator() { + log::error!("Failed to elect new coordinator: {}", e); + } + } + } + }); + + // Задача проверки здоровья координатора + tokio::spawn(async move { + let mut interval = interval(Duration::from_secs(15)); + + loop { + interval.tick().await; + cluster3.coordinator_health_check().await; + } + }); + } + + /// Проверка здоровья координатора + async fn coordinator_health_check(&self) { + if !self.is_coordinator.load(Ordering::Relaxed) { + // Если мы не координатор, проверяем жив ли координатор + let coordinator = self.get_current_coordinator(); + + if let Some(coord) = coordinator { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + + let heartbeat_timeout = 30; // 30 секунд + + if now - coord.last_heartbeat > heartbeat_timeout && coord.status == NodeStatus::Online { + log::warn!("Coordinator {} appears to be down (last heartbeat: {}s ago)", + coord.id, now - coord.last_heartbeat); + + // Обновляем статус координатора + if let Some(mut node) = self.nodes.get_mut(&coord.id) { + node.status = NodeStatus::Offline; + } + + // Отправляем событие + let _ = self.event_sender.send(ClusterEvent::CoordinatorFailed { + coordinator_id: coord.id.clone(), + }); + } + } else { + log::warn!("No coordinator defined in cluster"); + } + } + } + + /// Проверка здоровья координатора (альтернативная реализация) + async fn check_coordinator_health(&self) -> Result<(), String> { + if self.is_coordinator.load(Ordering::Relaxed) { + // Мы координатор - всегда здоровы + return Ok(()); + } + + let coordinator = self.get_current_coordinator(); + if let Some(coord) = coordinator { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + + let heartbeat_timeout = 30; // 30 секунд + + if now - coord.last_heartbeat > heartbeat_timeout { + return Err(format!("Coordinator {} heartbeat timeout", coord.id)); + } + + Ok(()) + } else { + Err("No coordinator defined".to_string()) + } + } + + /// Отправка heartbeat + async fn send_heartbeat(&self) { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + + // Обновляем время последнего heartbeat для координатора + let coordinator_id = self.get_coordinator_id(); + if let Some(mut node) = self.nodes.get_mut(&coordinator_id) { + node.last_heartbeat = now; + } + + // Отправляем событие + let _ = self.event_sender.send(ClusterEvent::Heartbeat { + timestamp: now, + coordinator_id: coordinator_id, + }); + } + + /// Обнаружение отказавших узлов + async fn detect_failed_nodes(&self) { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + + let timeout = 30; // 30 секунд + + for mut node in self.nodes.iter_mut() { + if node.id != self.get_coordinator_id() && now - node.last_heartbeat > timeout { + node.status = NodeStatus::Offline; + + // Отправляем событие + let _ = self.event_sender.send(ClusterEvent::NodeFailed { + node_id: node.id.clone(), + }); + } + } + } +} + +/// Статус кластера +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ClusterStatus { + pub coordinator_id: String, + pub coordinator_address: String, + pub is_coordinator: bool, + pub cluster_version: u64, + pub node_count: usize, + pub shard_count: usize, + pub nodes: Vec, + pub shards: Vec, +} + +/// События кластера +#[derive(Debug, Clone)] +pub enum ClusterEvent { + NodeJoined { + node_id: String, + address: String, + role: NodeRole, + }, + NodeEvicted { + node_id: String, + }, + NodeFailed { + node_id: String, + }, + ReplicationStarted { + source: String, + target: String, + }, + Heartbeat { + timestamp: u64, + coordinator_id: String, + }, + ShardMoved { + shard_id: String, + from_node: String, + to_node: String, + }, + CoordinatorChanged { + old_coordinator: String, + new_coordinator: String, + }, + CoordinatorFailed { + coordinator_id: String, + }, +} + +/// Менеджер репликации +#[derive(Clone)] +pub struct ReplicationManager { + cluster_manager: Arc, + replication_queue: Arc>>, + is_replicating: Arc, +} + +impl ReplicationManager { + pub fn new(cluster_manager: Arc) -> Self { + Self { + cluster_manager, + replication_queue: Arc::new(DashMap::new()), + is_replicating: Arc::new(AtomicBool::new(false)), + } + } + + /// Начало репликации данных + pub async fn replicate_data(&self, database_name: &str, table_name: &str) -> Result<(), ClusterError> { + // В реальной реализации здесь будет логика репликации данных таблицы + // Для упрощения просто добавляем задачу в очередь + + let task = ReplicationTask { + database: database_name.to_string(), + table: table_name.to_string(), + status: ReplicationStatus::Pending, + started_at: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + }; + + self.replication_queue + .entry(database_name.to_string()) + .or_insert_with(Vec::new) + .push(task); + + Ok(()) + } + + /// Запуск фоновой репликации + pub fn start_background_replication(self: Arc) { + let manager = Arc::clone(&self); + + tokio::spawn(async move { + let mut interval = interval(Duration::from_secs(1)); + + loop { + interval.tick().await; + + if !manager.is_replicating.load(Ordering::Relaxed) { + manager.process_replication_queue().await; + } + } + }); + } + + /// Обработка очереди репликации + async fn process_replication_queue(&self) { + self.is_replicating.store(true, Ordering::Relaxed); + + // Обрабатываем задачи репликации + for mut entry in self.replication_queue.iter_mut() { + let tasks = entry.value_mut(); + let mut completed_tasks = Vec::new(); + + for (i, task) in tasks.iter_mut().enumerate() { + if task.status == ReplicationStatus::Pending { + task.status = ReplicationStatus::InProgress; + + // В реальной реализации здесь будет репликация данных + // Для упрощения просто отмечаем как завершенную + task.status = ReplicationStatus::Completed; + completed_tasks.push(i); + } + } + + // Удаляем завершенные задачи + for i in completed_tasks.into_iter().rev() { + tasks.remove(i); + } + } + + self.is_replicating.store(false, Ordering::Relaxed); + } +} + +/// Задача репликации +#[derive(Debug, Clone)] +struct ReplicationTask { + database: String, + table: String, + status: ReplicationStatus, + started_at: u64, +} + +/// Статус репликации +#[derive(Debug, Clone, PartialEq, Copy)] +enum ReplicationStatus { + Pending, + InProgress, + Completed, + Failed, +} + +/// Ошибки кластера +#[derive(Debug, Error)] +pub enum ClusterError { + #[error("Node not found: {0}")] + NodeNotFound(String), + + #[error("Shard not found: {0}")] + ShardNotFound(String), + + #[error("Cannot evict coordinator")] + CannotEvictCoordinator, + + #[error("Cannot evict last node")] + CannotEvictLastNode, + + #[error("No nodes in cluster")] + NoNodes, + + #[error("Replication error: {0}")] + ReplicationError(String), + + #[error("Network error: {0}")] + NetworkError(String), + + #[error("Coordinator election failed: {0}")] + CoordinatorElectionFailed(String), +}