// src/server/sharding.rs //! Модуль шардинга с консистентным хэшированием и Raft протоколом //! //! Объединяет функционал шардинга и репликации с lock-free архитектурой //! и реализацией Raft консенсуса для работы в production. use std::collections::{HashMap, BTreeMap}; use std::hash::{Hash, Hasher}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use tokio::sync::mpsc; use tokio::time::{interval, Duration}; use tokio::io::AsyncWriteExt; use serde::{Serialize, Deserialize}; use siphasher::sip::SipHasher13; use dashmap::DashMap; use crossbeam::queue::SegQueue; use crate::common::Result; use crate::common::protocol; /// Состояния узла в Raft протоколе #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum RaftState { Follower, Candidate, Leader, } /// Информация о Raft узле #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RaftNode { pub node_id: String, pub address: String, pub state: RaftState, pub term: u64, pub voted_for: Option, pub last_heartbeat: i64, } /// Информация о шард-узле с Raft #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ShardNode { pub node_id: String, pub address: String, pub capacity: u64, pub used: u64, pub collections: Vec, pub raft_info: RaftNode, } /// Состояние шардинга для коллекции #[derive(Debug, Clone)] pub struct CollectionSharding { pub shard_key: String, pub virtual_nodes: usize, pub ring: BTreeMap, // consistent hash ring } /// События репликации #[derive(Debug, Serialize, Deserialize)] pub enum ReplicationEvent { Command(protocol::Command), SyncRequest, Heartbeat, RaftVoteRequest { term: u64, candidate_id: String }, RaftVoteResponse { term: u64, vote_granted: bool }, RaftAppendEntries { term: u64, leader_id: String }, } /// Lock-Free очередь репликации struct LockFreeReplicationQueue { queue: SegQueue, size: AtomicUsize, } impl LockFreeReplicationQueue { fn new() -> Self { Self { queue: SegQueue::new(), size: AtomicUsize::new(0), } } fn push(&self, event: ReplicationEvent) { self.queue.push(event); self.size.fetch_add(1, Ordering::SeqCst); } fn pop(&self) -> Option { let event = self.queue.pop(); if event.is_some() { self.size.fetch_sub(1, Ordering::SeqCst); } event } fn len(&self) -> usize { self.size.load(Ordering::Acquire) } } /// Менеджер шардинга и репликации с Raft #[derive(Clone)] pub struct ShardingManager { // Шардинг компоненты nodes: Arc>, // Lock-free хранение узлов collections: Arc>, // Lock-free хранение коллекций virtual_nodes_per_node: usize, min_nodes_for_cluster: usize, // Raft компоненты current_term: Arc, // Текущий терм Raft voted_for: Arc>, // Голоса за термы is_leader: Arc, // Флаг лидера cluster_formed: Arc, // Флаг сформированности кластера // Репликация компоненты replication_queue: Arc, sequence_number: Arc, replication_enabled: Arc, node_id: String, // ID текущего узла } impl ShardingManager { /// Создание нового менеджера шардинга и репликации pub fn new( virtual_nodes_per_node: usize, replication_enabled: bool, min_nodes_for_cluster: usize, node_id: String ) -> Self { let manager = Self { nodes: Arc::new(DashMap::new()), collections: Arc::new(DashMap::new()), virtual_nodes_per_node, min_nodes_for_cluster, current_term: Arc::new(AtomicU64::new(0)), voted_for: Arc::new(DashMap::new()), is_leader: Arc::new(AtomicBool::new(false)), cluster_formed: Arc::new(AtomicBool::new(false)), replication_queue: Arc::new(LockFreeReplicationQueue::new()), sequence_number: Arc::new(AtomicU64::new(0)), replication_enabled: Arc::new(AtomicBool::new(replication_enabled)), node_id, }; // Добавляем текущий узел в кластер let _ = manager.add_node( manager.node_id.clone(), "127.0.0.1:8081".to_string(), 1024 * 1024 * 1024 ); // Запуск фоновой задачи обработки репликации и Raft let manager_clone = manager.clone(); tokio::spawn(async move { manager_clone.run_replication_loop().await; }); manager } /// Фоновая задача обработки репликации и Raft async fn run_replication_loop(self) { let mut heartbeat_interval = interval(Duration::from_millis(1000)); let mut election_timeout = interval(Duration::from_millis(5000)); loop { tokio::select! { _ = heartbeat_interval.tick() => { // ИСПРАВЛЕНИЕ: Проверяем, что кластер сформирован перед отправкой heartbeat if self.is_leader.load(Ordering::SeqCst) && self.replication_enabled.load(Ordering::SeqCst) && self.cluster_formed.load(Ordering::SeqCst) { let _ = self.send_heartbeat().await; } } _ = election_timeout.tick() => { // Если мы follower и не получали heartbeat, начинаем выборы if !self.is_leader.load(Ordering::SeqCst) && self.replication_enabled.load(Ordering::SeqCst) && self.cluster_formed.load(Ordering::SeqCst) { let _ = self.start_election(); } } _ = tokio::time::sleep(Duration::from_millis(10)) => { // Обработка событий из lock-free очереди while let Some(event) = self.replication_queue.pop() { self.handle_replication_event(event).await; } } } } } /// Обработка событий репликации async fn handle_replication_event(&self, event: ReplicationEvent) { if !self.replication_enabled.load(Ordering::SeqCst) { return; } match event { ReplicationEvent::Command(cmd) => { self.replicate_command(cmd).await; } ReplicationEvent::SyncRequest => { self.sync_with_nodes().await; } ReplicationEvent::Heartbeat => { let _ = self.send_heartbeat().await; } ReplicationEvent::RaftVoteRequest { term, candidate_id } => { self.handle_vote_request(term, candidate_id).await; } ReplicationEvent::RaftVoteResponse { term, vote_granted } => { self.handle_vote_response(term, vote_granted).await; } ReplicationEvent::RaftAppendEntries { term, leader_id } => { self.handle_append_entries(term, leader_id).await; } } } /// Репликация команды на другие узлы async fn replicate_command(&self, command: protocol::Command) { if !self.cluster_formed.load(Ordering::SeqCst) { return; } let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst); for entry in self.nodes.iter() { let node = entry.value(); // Реплицируем на все узлы, кроме текущего лидера (если мы лидер) if self.is_leader.load(Ordering::SeqCst) && node.raft_info.node_id == self.node_id { continue; } let node_addr = node.address.clone(); let cmd_clone = command.clone(); let seq_clone = sequence; tokio::spawn(async move { if let Err(e) = Self::send_command_to_node(&node_addr, &cmd_clone, seq_clone).await { eprintln!("Failed to replicate to {}: {}", node_addr, e); } }); } } /// Отправка команды на удаленный узел async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> { let mut stream = match tokio::net::TcpStream::connect(node).await { Ok(stream) => stream, Err(e) => { eprintln!("Failed to connect to {}: {}", node, e); return Ok(()); } }; let message = protocol::ReplicationMessage { sequence, command: command.clone(), timestamp: chrono::Utc::now().timestamp(), }; let bytes = protocol::serialize(&message)?; if let Err(e) = stream.write_all(&bytes).await { eprintln!("Failed to send command to {}: {}", node, e); } Ok(()) } /// Синхронизация с другими узлами async fn sync_with_nodes(&self) { if !self.cluster_formed.load(Ordering::SeqCst) { return; } println!("Starting sync with {} nodes", self.nodes.len()); for entry in self.nodes.iter() { let node_addr = entry.value().address.clone(); tokio::spawn(async move { if let Err(e) = Self::sync_with_node(&node_addr).await { eprintln!("Failed to sync with {}: {}", node_addr, e); } }); } } /// Синхронизация с удаленным узлом async fn sync_with_node(_node: &str) -> Result<()> { // TODO: Реализовать lock-free синхронизацию данных Ok(()) } /// Отправка heartbeat async fn send_heartbeat(&self) -> Result<()> { if !self.cluster_formed.load(Ordering::SeqCst) { return Ok(()); } for entry in self.nodes.iter() { let node = entry.value(); // Отправляем heartbeat только на follower узлы, кроме себя if node.raft_info.state == RaftState::Follower && node.raft_info.node_id != self.node_id { let node_addr = node.address.clone(); tokio::spawn(async move { if let Err(e) = Self::send_heartbeat_to_node(&node_addr).await { eprintln!("Heartbeat failed for {}: {}", node_addr, e); } }); } } Ok(()) } /// Отправка heartbeat на удаленный узел async fn send_heartbeat_to_node(node: &str) -> Result<()> { let mut stream = match tokio::net::TcpStream::connect(node).await { Ok(stream) => stream, Err(e) => { eprintln!("Failed to connect to {} for heartbeat: {}", node, e); return Ok(()); } }; let heartbeat = protocol::ReplicationMessage { sequence: 0, command: protocol::Command::CallProcedure { name: "heartbeat".to_string() }, timestamp: chrono::Utc::now().timestamp(), }; let bytes = protocol::serialize(&heartbeat)?; if let Err(e) = stream.write_all(&bytes).await { eprintln!("Failed to send heartbeat to {}: {}", node, e); } Ok(()) } // Raft методы /// Обработка запроса голоса async fn handle_vote_request(&self, term: u64, candidate_id: String) { let current_term = self.current_term.load(Ordering::SeqCst); if term > current_term { self.current_term.store(term, Ordering::SeqCst); self.voted_for.insert(term, candidate_id.clone()); // TODO: Отправить положительный ответ } // TODO: Отправить отрицательный ответ если условия не выполнены } /// Обработка ответа голоса async fn handle_vote_response(&self, term: u64, vote_granted: bool) { if vote_granted && term == self.current_term.load(Ordering::SeqCst) { // TODO: Подсчитать голоса и перейти в лидеры при большинстве } } /// Обработка AppendEntries RPC async fn handle_append_entries(&self, term: u64, leader_id: String) { let current_term = self.current_term.load(Ordering::SeqCst); if term >= current_term { self.current_term.store(term, Ordering::SeqCst); self.is_leader.store(false, Ordering::SeqCst); // Обновляем состояние узла как follower if let Some(mut node) = self.nodes.get_mut(&self.node_id) { node.raft_info.state = RaftState::Follower; node.raft_info.term = term; node.raft_info.last_heartbeat = chrono::Utc::now().timestamp(); } } } // Шардинг методы /// Добавление шард-узла с Raft информацией pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> { let raft_node = RaftNode { node_id: node_id.clone(), address: address.clone(), state: RaftState::Follower, term: 0, voted_for: None, last_heartbeat: chrono::Utc::now().timestamp(), }; let node = ShardNode { node_id: node_id.clone(), address, capacity, used: 0, collections: Vec::new(), raft_info: raft_node, }; self.nodes.insert(node_id, node); // Проверяем сформированность кластера if self.nodes.len() >= self.min_nodes_for_cluster { self.cluster_formed.store(true, Ordering::SeqCst); println!("Cluster formed with {} nodes (minimum required: {})", self.nodes.len(), self.min_nodes_for_cluster); } Ok(()) } /// Удаление шард-узла pub fn remove_node(&self, node_id: &str) -> Result<()> { self.nodes.remove(node_id); // Проверяем сформированность кластера после удаления if self.nodes.len() < self.min_nodes_for_cluster { self.cluster_formed.store(false, Ordering::SeqCst); self.is_leader.store(false, Ordering::SeqCst); println!("Cluster no longer formed. Have {} nodes (need {})", self.nodes.len(), self.min_nodes_for_cluster); } Ok(()) } /// Настройка шардинга для коллекции pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> { // Проверка наличия кластера перед настройкой шардинга if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( format!("Cannot setup sharding: cluster not formed. Need at least {} nodes.", self.min_nodes_for_cluster) )); } let sharding = CollectionSharding { shard_key: shard_key.to_string(), virtual_nodes: self.virtual_nodes_per_node, ring: BTreeMap::new(), }; self.collections.insert(collection.to_string(), sharding); self.rebuild_ring(collection)?; Ok(()) } /// Перестроение хэш-ринга для коллекции fn rebuild_ring(&self, collection: &str) -> Result<()> { if let Some(mut sharding) = self.collections.get_mut(collection) { sharding.ring.clear(); for entry in self.nodes.iter() { let node_id = entry.key(); for i in 0..sharding.virtual_nodes { let key = format!("{}-{}", node_id, i); let hash = self.hash_key(&key); sharding.ring.insert(hash, node_id.clone()); } } } Ok(()) } /// Хэширование ключа fn hash_key(&self, key: &str) -> u64 { let mut hasher = SipHasher13::new(); key.hash(&mut hasher); hasher.finish() } /// Поиск узла для ключа pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result> { // Проверка наличия кластера перед поиском узла if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( format!("Cannot find node: cluster not formed. Need at least {} nodes.", self.min_nodes_for_cluster) )); } if let Some(sharding) = self.collections.get(collection) { let key_hash = self.hash_key(key_value); // Поиск в хэш-ринге (консистентное хэширование) let mut range = sharding.ring.range(key_hash..); if let Some((_, node_id)) = range.next() { return Ok(Some(node_id.clone())); } // Если не найдено в верхней части ринга, берем первый узел if let Some((_, node_id)) = sharding.ring.iter().next() { return Ok(Some(node_id.clone())); } } Ok(None) } /// Миграция шарда pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> { // Проверка наличия кластера перед миграцией if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( format!("Cannot migrate shard: cluster not formed. Need at least {} nodes.", self.min_nodes_for_cluster) )); } // Проверяем существование узлов if !self.nodes.contains_key(from_node) { return Err(crate::common::FutriixError::ShardingError( format!("Source node '{}' not found in cluster", from_node) )); } if !self.nodes.contains_key(to_node) { return Err(crate::common::FutriixError::ShardingError( format!("Destination node '{}' not found in cluster", to_node) )); } println!("Migrating shard for collection '{}' from {} to {} with key {}", collection, from_node, to_node, shard_key); // TODO: Реализовать фактическую миграцию данных self.rebuild_ring(collection)?; Ok(()) } /// Ребалансировка кластера pub fn rebalance_cluster(&self) -> Result<()> { // Проверка наличия кластера перед ребалансировкой if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( format!("Cannot rebalance cluster: cluster not formed. Need at least {} nodes.", self.min_nodes_for_cluster) )); } println!("Rebalancing cluster with {} nodes", self.nodes.len()); // Перестраиваем все хэш-ринги for mut entry in self.collections.iter_mut() { let sharding = entry.value_mut(); sharding.ring.clear(); for node_entry in self.nodes.iter() { let node_id = node_entry.key(); for i in 0..sharding.virtual_nodes { let key = format!("{}-{}", node_id, i); let hash = self.hash_key(&key); sharding.ring.insert(hash, node_id.clone()); } } } // Ребалансировка узлов кластера self.rebalance_nodes()?; Ok(()) } /// Ребалансировка узлов кластера fn rebalance_nodes(&self) -> Result<()> { println!("Rebalancing nodes in cluster..."); // Рассчитываем среднюю загрузку let total_capacity: u64 = self.nodes.iter().map(|entry| entry.value().capacity).sum(); let total_used: u64 = self.nodes.iter().map(|entry| entry.value().used).sum(); let avg_usage = if total_capacity > 0 { total_used as f64 / total_capacity as f64 } else { 0.0 }; println!("Cluster usage: {:.2}% ({} / {})", avg_usage * 100.0, total_used, total_capacity); // Находим перегруженные и недогруженные узлы let mut overloaded_nodes = Vec::new(); let mut underloaded_nodes = Vec::new(); for entry in self.nodes.iter() { let node = entry.value(); let usage = if node.capacity > 0 { node.used as f64 / node.capacity as f64 } else { 0.0 }; if usage > avg_usage * 1.2 { // Более чем на 20% выше среднего overloaded_nodes.push((node.node_id.clone(), usage)); } else if usage < avg_usage * 0.8 { // Более чем на 20% ниже среднего underloaded_nodes.push((node.node_id.clone(), usage)); } } println!("Overloaded nodes: {}", overloaded_nodes.len()); println!("Underloaded nodes: {}", underloaded_nodes.len()); // TODO: Реализовать алгоритм миграции данных между узлами Ok(()) } /// Получение статуса кластера pub fn get_cluster_status(&self) -> Result { let mut cluster_nodes = Vec::new(); let mut total_capacity = 0; let mut total_used = 0; let mut raft_nodes = Vec::new(); for entry in self.nodes.iter() { let node = entry.value(); total_capacity += node.capacity; total_used += node.used; cluster_nodes.push(protocol::ShardInfo { node_id: node.node_id.clone(), address: node.address.clone(), capacity: node.capacity, used: node.used, collections: node.collections.clone(), }); raft_nodes.push(protocol::RaftNodeInfo { node_id: node.node_id.clone(), address: node.address.clone(), state: match node.raft_info.state { RaftState::Leader => "leader".to_string(), RaftState::Follower => "follower".to_string(), RaftState::Candidate => "candidate".to_string(), }, term: node.raft_info.term, last_heartbeat: node.raft_info.last_heartbeat, }); } // Проверяем, нужна ли ребалансировка let rebalance_needed = { if total_capacity == 0 { false } else { let avg_usage = total_used as f64 / total_capacity as f64; let mut needs_rebalance = false; for node in self.nodes.iter() { let usage = if node.value().capacity > 0 { node.value().used as f64 / node.value().capacity as f64 } else { 0.0 }; if usage > avg_usage * 1.2 || usage < avg_usage * 0.8 { needs_rebalance = true; break; } } needs_rebalance } }; Ok(protocol::ClusterStatus { nodes: cluster_nodes, total_capacity, total_used, rebalance_needed, cluster_formed: self.cluster_formed.load(Ordering::SeqCst), leader_exists: self.is_leader.load(Ordering::SeqCst), raft_nodes, }) } /// Получение списка Raft узлов pub fn get_raft_nodes(&self) -> Vec { self.nodes.iter() .map(|entry| entry.value().raft_info.clone()) .collect() } /// Проверка сформированности кластера pub fn is_cluster_formed(&self) -> bool { self.cluster_formed.load(Ordering::SeqCst) } /// Raft выборы - начало кампании pub fn start_election(&self) -> Result<()> { if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( format!("Cluster not formed. Need at least {} nodes.", self.min_nodes_for_cluster) )); } let new_term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1; println!("Starting election for term {}", new_term); // Переход в состояние candidate self.is_leader.store(false, Ordering::SeqCst); // Обновляем состояние текущего узла if let Some(mut node) = self.nodes.get_mut(&self.node_id) { node.raft_info.state = RaftState::Candidate; node.raft_info.term = new_term; node.raft_info.voted_for = Some(self.node_id.clone()); } // Отправляем запросы на голосование self.replication_queue.push(ReplicationEvent::RaftVoteRequest { term: new_term, candidate_id: self.node_id.clone(), }); Ok(()) } /// Отправка команды на репликацию pub async fn replicate(&self, command: protocol::Command) -> Result<()> { if !self.replication_enabled.load(Ordering::SeqCst) { return Ok(()); } if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( "Cannot replicate: cluster not formed".to_string() )); } self.replication_queue.push(ReplicationEvent::Command(command)); Ok(()) } /// Запрос синхронизации с другими узлами pub async fn request_sync(&self) -> Result<()> { if !self.replication_enabled.load(Ordering::SeqCst) { return Ok(()); } self.replication_queue.push(ReplicationEvent::SyncRequest); Ok(()) } /// Получение списка узлов репликации pub fn get_nodes(&self) -> Vec { self.nodes.iter() .map(|entry| entry.value().clone()) .collect() } /// Получение текущего номера последовательности pub fn get_sequence_number(&self) -> u64 { self.sequence_number.load(Ordering::SeqCst) } /// Проверка, включена ли репликация pub fn is_replication_enabled(&self) -> bool { self.replication_enabled.load(Ordering::SeqCst) } /// Получение информации об узле pub fn get_node(&self, node_id: &str) -> Option { self.nodes.get(node_id).map(|entry| entry.value().clone()) } /// Получение ID текущего узла pub fn get_node_id(&self) -> &str { &self.node_id } }