// src/server/sharding.rs //! Lock-free модуль шардинга с консистентным хэшированием и Raft протоколом //! //! Основные компоненты: //! 1. ShardingManager - управление распределением данных по узлам кластера //! 2. RaftState - состояния узлов в Raft протоколе для консенсуса //! 3. CollectionSharding - настройки шардинга для отдельных коллекций //! 4. Lock-free репликация с консистентным хэшированием //! //! Особенности: //! - Консистентное хэширование для равномерного распределения данных //! - Raft протокол для выбора лидера и консенсуса //! - Атомарные операции без блокировок //! - Автоматическая ребалансировка кластера use std::collections::HashMap; use std::hash::{Hash, Hasher}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use tokio::sync::mpsc; use tokio::time::{interval, Duration}; use tokio::io::AsyncWriteExt; use serde::{Serialize, Deserialize}; use siphasher::sip::SipHasher13; use crossbeam::queue::SegQueue; use crossbeam::epoch::{self, Atomic, Owned, Guard}; use dashmap::{DashMap, DashSet}; use crate::common::Result; use crate::common::protocol; /// Состояния узла в Raft протоколе #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum RaftState { Follower, Candidate, Leader, } /// Atomic Raft состояние для атомарных операций struct AtomicRaftState { inner: AtomicU64, } impl AtomicRaftState { fn new() -> Self { Self { inner: AtomicU64::new(0), } } fn get(&self) -> RaftState { match self.inner.load(Ordering::Acquire) { 0 => RaftState::Follower, 1 => RaftState::Candidate, 2 => RaftState::Leader, _ => RaftState::Follower, } } fn set(&self, state: RaftState) { let value = match state { RaftState::Follower => 0, RaftState::Candidate => 1, RaftState::Leader => 2, }; self.inner.store(value, Ordering::Release); } fn compare_exchange(&self, current: RaftState, new: RaftState, order: Ordering) -> std::result::Result { let current_val = match current { RaftState::Follower => 0, RaftState::Candidate => 1, RaftState::Leader => 2, }; let new_val = match new { RaftState::Follower => 0, RaftState::Candidate => 1, RaftState::Leader => 2, }; match self.inner.compare_exchange(current_val, new_val, order, Ordering::Relaxed) { Ok(_) => Ok(new), Err(actual_val) => { let actual_state = match actual_val { 0 => RaftState::Follower, 1 => RaftState::Candidate, 2 => RaftState::Leader, _ => RaftState::Follower, }; Err(actual_state) } } } } /// Информация о Raft узле #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RaftNode { pub node_id: String, pub address: String, pub state: RaftState, pub term: u64, pub voted_for: Option, pub last_heartbeat: i64, } /// Информация о шард-узле с Raft #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ShardNode { pub node_id: String, pub address: String, pub capacity: u64, pub used: u64, pub collections: Vec, pub raft_info: RaftNode, } /// Состояние шардинга для коллекции #[derive(Debug, Clone)] pub struct CollectionSharding { pub shard_key: String, pub virtual_nodes: usize, // Используем Arc для совместного доступа из нескольких потоков pub ring: Arc>, } /// События репликации #[derive(Debug, Serialize, Deserialize)] pub enum ReplicationEvent { Command(protocol::Command), SyncRequest, Heartbeat, RaftVoteRequest { term: u64, candidate_id: String }, RaftVoteResponse { term: u64, vote_granted: bool }, RaftAppendEntries { term: u64, leader_id: String }, } /// Lock-Free очередь репликации на основе SegQueue struct LockFreeReplicationQueue { queue: SegQueue, size: AtomicUsize, } impl LockFreeReplicationQueue { fn new() -> Self { Self { queue: SegQueue::new(), size: AtomicUsize::new(0), } } fn push(&self, event: ReplicationEvent) { self.queue.push(event); self.size.fetch_add(1, Ordering::SeqCst); } fn pop(&self) -> Option { let event = self.queue.pop(); if event.is_some() { self.size.fetch_sub(1, Ordering::SeqCst); } event } fn len(&self) -> usize { self.size.load(Ordering::Acquire) } } /// Менеджер шардинга и репликации с Raft #[derive(Clone)] pub struct ShardingManager { // Шардинг компоненты nodes: Arc>, // Используем DashMap для атомарного доступа к настройкам шардинга коллекций collections: Arc>, virtual_nodes_per_node: usize, min_nodes_for_cluster: usize, // Raft компоненты с atomic операциями current_term: Arc, voted_for: Arc>, is_leader: Arc, raft_state: Arc, cluster_formed: Arc, // Репликация компоненты replication_queue: Arc, sequence_number: Arc, replication_enabled: Arc, node_id: String, } impl ShardingManager { pub fn new( virtual_nodes_per_node: usize, replication_enabled: bool, min_nodes_for_cluster: usize, node_id: String ) -> Self { // Создаем менеджер с начальными настройками let manager = Self { nodes: Arc::new(DashMap::new()), collections: Arc::new(DashMap::new()), virtual_nodes_per_node, min_nodes_for_cluster, current_term: Arc::new(AtomicU64::new(0)), voted_for: Arc::new(DashMap::new()), is_leader: Arc::new(AtomicBool::new(false)), raft_state: Arc::new(AtomicRaftState::new()), cluster_formed: Arc::new(AtomicBool::new(false)), replication_queue: Arc::new(LockFreeReplicationQueue::new()), sequence_number: Arc::new(AtomicU64::new(0)), replication_enabled: Arc::new(AtomicBool::new(replication_enabled)), node_id, }; // Добавляем текущий узел в кластер let _ = manager.add_node( manager.node_id.clone(), "127.0.0.1:8081".to_string(), 1024 * 1024 * 1024 ); // Запускаем фоновый цикл репликации let manager_clone = manager.clone(); tokio::spawn(async move { manager_clone.run_replication_loop().await; }); manager } async fn run_replication_loop(self) { let mut heartbeat_interval = interval(Duration::from_millis(1000)); let mut election_timeout = interval(Duration::from_millis(5000)); loop { tokio::select! { _ = heartbeat_interval.tick() => { if self.is_leader.load(Ordering::SeqCst) && self.replication_enabled.load(Ordering::SeqCst) && self.cluster_formed.load(Ordering::SeqCst) { let _ = self.send_heartbeat().await; } } _ = election_timeout.tick() => { if !self.is_leader.load(Ordering::SeqCst) && self.replication_enabled.load(Ordering::SeqCst) && self.cluster_formed.load(Ordering::SeqCst) { let _ = self.start_election(); } } _ = tokio::time::sleep(Duration::from_millis(10)) => { while let Some(event) = self.replication_queue.pop() { self.handle_replication_event(event).await; } } } } } async fn handle_replication_event(&self, event: ReplicationEvent) { if !self.replication_enabled.load(Ordering::SeqCst) { return; } match event { ReplicationEvent::Command(cmd) => { self.replicate_command(cmd).await; } ReplicationEvent::SyncRequest => { self.sync_with_nodes().await; } ReplicationEvent::Heartbeat => { let _ = self.send_heartbeat().await; } ReplicationEvent::RaftVoteRequest { term, candidate_id } => { self.handle_vote_request(term, candidate_id).await; } ReplicationEvent::RaftVoteResponse { term, vote_granted } => { self.handle_vote_response(term, vote_granted).await; } ReplicationEvent::RaftAppendEntries { term, leader_id } => { self.handle_append_entries(term, leader_id).await; } } } async fn replicate_command(&self, command: protocol::Command) { if !self.cluster_formed.load(Ordering::SeqCst) { return; } let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst); let nodes: Vec = self.nodes.iter() .map(|entry| entry.value().clone()) .collect(); for node in nodes { if self.is_leader.load(Ordering::SeqCst) && node.raft_info.node_id == self.node_id { continue; } let node_addr = node.address.clone(); let cmd_clone = command.clone(); let seq_clone = sequence; tokio::spawn(async move { if let Err(e) = Self::send_command_to_node(&node_addr, &cmd_clone, seq_clone).await { eprintln!("Failed to replicate to {}: {}", node_addr, e); } }); } } async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> { let mut stream = match tokio::net::TcpStream::connect(node).await { Ok(stream) => stream, Err(e) => { eprintln!("Failed to connect to {}: {}", node, e); return Ok(()); } }; let message = protocol::ReplicationMessage { sequence, command: command.clone(), timestamp: chrono::Utc::now().timestamp(), }; let bytes = protocol::serialize(&message)?; if let Err(e) = stream.write_all(&bytes).await { eprintln!("Failed to send command to {}: {}", node, e); } Ok(()) } async fn sync_with_nodes(&self) { if !self.cluster_formed.load(Ordering::SeqCst) { return; } let node_count = self.nodes.len(); println!("Starting sync with {} nodes", node_count); let nodes: Vec = self.nodes.iter() .map(|entry| entry.value().address.clone()) .collect(); for node_addr in nodes { tokio::spawn(async move { if let Err(e) = Self::sync_with_node(&node_addr).await { eprintln!("Failed to sync with {}: {}", node_addr, e); } }); } } async fn sync_with_node(_node: &str) -> Result<()> { // Заглушка для синхронизации Ok(()) } async fn send_heartbeat(&self) -> Result<()> { if !self.cluster_formed.load(Ordering::SeqCst) { return Ok(()); } let nodes: Vec = self.nodes.iter() .map(|entry| entry.value().clone()) .collect(); for node in nodes { if node.raft_info.state == RaftState::Follower && node.raft_info.node_id != self.node_id { let node_addr = node.address.clone(); tokio::spawn(async move { if let Err(e) = Self::send_heartbeat_to_node(&node_addr).await { eprintln!("Heartbeat failed for {}: {}", node_addr, e); } }); } } Ok(()) } async fn send_heartbeat_to_node(node: &str) -> Result<()> { let mut stream = match tokio::net::TcpStream::connect(node).await { Ok(stream) => stream, Err(e) => { eprintln!("Failed to connect to {} for heartbeat: {}", node, e); return Ok(()); } }; let heartbeat = protocol::ReplicationMessage { sequence: 0, command: protocol::Command::CallProcedure { name: "heartbeat".to_string() }, timestamp: chrono::Utc::now().timestamp(), }; let bytes = protocol::serialize(&heartbeat)?; if let Err(e) = stream.write_all(&bytes).await { eprintln!("Failed to send heartbeat to {}: {}", node, e); } Ok(()) } async fn handle_vote_request(&self, term: u64, candidate_id: String) { let current_term = self.current_term.load(Ordering::SeqCst); if term > current_term { self.current_term.store(term, Ordering::SeqCst); self.voted_for.insert(term, candidate_id); } } async fn handle_vote_response(&self, term: u64, vote_granted: bool) { if vote_granted && term == self.current_term.load(Ordering::SeqCst) { let node_count = self.nodes.len(); if node_count >= self.min_nodes_for_cluster { match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Leader, Ordering::SeqCst) { Ok(_) => { self.is_leader.store(true, Ordering::SeqCst); println!("Elected as leader for term {}", term); } Err(_) => {} } } } } async fn handle_append_entries(&self, term: u64, leader_id: String) { let current_term = self.current_term.load(Ordering::SeqCst); if term >= current_term { self.current_term.store(term, Ordering::SeqCst); match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Follower, Ordering::SeqCst) { Ok(_) => { self.is_leader.store(false, Ordering::SeqCst); if let Some(mut node) = self.nodes.get_mut(&self.node_id) { node.raft_info.state = RaftState::Follower; node.raft_info.term = term; node.raft_info.last_heartbeat = chrono::Utc::now().timestamp(); } } Err(_) => {} } } } pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> { let raft_node = RaftNode { node_id: node_id.clone(), address: address.clone(), state: RaftState::Follower, term: 0, voted_for: None, last_heartbeat: chrono::Utc::now().timestamp(), }; let node = ShardNode { node_id: node_id.clone(), address, capacity, used: 0, collections: Vec::new(), raft_info: raft_node, }; self.nodes.insert(node_id, node); let node_count = self.nodes.len(); if node_count >= self.min_nodes_for_cluster { self.cluster_formed.store(true, Ordering::SeqCst); println!("Cluster formed with {} nodes (minimum required: {})", node_count, self.min_nodes_for_cluster); } Ok(()) } pub fn remove_node(&self, node_id: &str) -> Result<()> { self.nodes.remove(node_id); let node_count = self.nodes.len(); if node_count < self.min_nodes_for_cluster { self.cluster_formed.store(false, Ordering::SeqCst); self.is_leader.store(false, Ordering::SeqCst); println!("Cluster no longer formed. Have {} nodes (need {})", node_count, self.min_nodes_for_cluster); } Ok(()) } pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> { if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( format!("Cannot setup sharding: cluster not formed. Need at least {} nodes.", self.min_nodes_for_cluster) )); } let sharding = CollectionSharding { shard_key: shard_key.to_string(), virtual_nodes: self.virtual_nodes_per_node, ring: Arc::new(DashMap::new()), }; self.collections.insert(collection.to_string(), sharding); self.rebuild_ring(collection)?; Ok(()) } fn rebuild_ring(&self, collection: &str) -> Result<()> { if let Some(mut entry) = self.collections.get_mut(collection) { let sharding = entry.value_mut(); // Очищаем ring sharding.ring.clear(); let nodes: Vec = self.nodes.iter() .map(|node_entry| node_entry.key().clone()) .collect(); for node_id in nodes { for i in 0..sharding.virtual_nodes { let key = format!("{}-{}", node_id, i); let hash = self.hash_key(&key); sharding.ring.insert(hash, node_id.clone()); } } } Ok(()) } fn hash_key(&self, key: &str) -> u64 { let mut hasher = SipHasher13::new(); key.hash(&mut hasher); hasher.finish() } pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result> { if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( format!("Cannot find node: cluster not formed. Need at least {} nodes.", self.min_nodes_for_cluster) )); } if let Some(sharding) = self.collections.get(collection) { let key_hash = self.hash_key(key_value); // Ищем ближайший узел в ConcurrentHashMap // Собираем все записи в вектор let mut entries: Vec<(u64, String)> = sharding.ring.iter() .map(|entry| (*entry.key(), entry.value().clone())) .collect(); // Сортируем по хэшу entries.sort_by_key(|&(hash, _)| hash); // Находим первый узел с хэшем >= key_hash for (hash, node_id) in &entries { if *hash >= key_hash { return Ok(Some(node_id.clone())); } } // Если не нашли, возвращаем первый узел // Используем итерацию по срезу, чтобы не перемещать вектор if let Some((_, node_id)) = entries.first() { return Ok(Some(node_id.clone())); } } Ok(None) } pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> { if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( format!("Cannot migrate shard: cluster not formed. Need at least {} nodes.", self.min_nodes_for_cluster) )); } if !self.nodes.contains_key(from_node) { return Err(crate::common::FutriixError::ShardingError( format!("Source node '{}' not found in cluster", from_node) )); } if !self.nodes.contains_key(to_node) { return Err(crate::common::FutriixError::ShardingError( format!("Destination node '{}' not found in cluster", to_node) )); } println!("Migrating shard for collection '{}' from {} to {} with key {}", collection, from_node, to_node, shard_key); self.rebuild_ring(collection)?; Ok(()) } pub fn rebalance_cluster(&self) -> Result<()> { if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( format!("Cannot rebalance cluster: cluster not formed. Need at least {} nodes.", self.min_nodes_for_cluster) )); } let node_count = self.nodes.len(); println!("Rebalancing cluster with {} nodes", node_count); // Перестраиваем все rings for key in self.collections.iter().map(|entry| entry.key().clone()).collect::>() { self.rebuild_ring(&key)?; } self.rebalance_nodes()?; Ok(()) } fn rebalance_nodes(&self) -> Result<()> { println!("Rebalancing nodes in cluster..."); let mut total_capacity = 0; let mut total_used = 0; let mut nodes_info = Vec::new(); for node in self.nodes.iter() { total_capacity += node.capacity; total_used += node.used; nodes_info.push((node.node_id.clone(), node.used, node.capacity)); } let avg_usage = if total_capacity > 0 { total_used as f64 / total_capacity as f64 } else { 0.0 }; println!("Cluster usage: {:.2}% ({} / {})", avg_usage * 100.0, total_used, total_capacity); let mut overloaded_nodes = Vec::new(); let mut underloaded_nodes = Vec::new(); for (node_id, used, capacity) in nodes_info { let usage = if capacity > 0 { used as f64 / capacity as f64 } else { 0.0 }; if usage > avg_usage * 1.2 { overloaded_nodes.push((node_id, usage)); } else if usage < avg_usage * 0.8 { underloaded_nodes.push((node_id, usage)); } } println!("Overloaded nodes: {}", overloaded_nodes.len()); println!("Underloaded nodes: {}", underloaded_nodes.len()); Ok(()) } pub fn get_cluster_status(&self) -> Result { let mut cluster_nodes = Vec::new(); let mut total_capacity = 0; let mut total_used = 0; let mut raft_nodes = Vec::new(); for node in self.nodes.iter() { total_capacity += node.capacity; total_used += node.used; cluster_nodes.push(protocol::ShardInfo { node_id: node.node_id.clone(), address: node.address.clone(), capacity: node.capacity, used: node.used, collections: node.collections.clone(), }); raft_nodes.push(protocol::RaftNodeInfo { node_id: node.node_id.clone(), address: node.address.clone(), state: match node.raft_info.state { RaftState::Leader => "leader".to_string(), RaftState::Follower => "follower".to_string(), RaftState::Candidate => "candidate".to_string(), }, term: node.raft_info.term, last_heartbeat: node.raft_info.last_heartbeat, }); } let rebalance_needed = { if total_capacity == 0 { false } else { let avg_usage = total_used as f64 / total_capacity as f64; let mut needs_rebalance = false; for node in self.nodes.iter() { let usage = if node.capacity > 0 { node.used as f64 / node.capacity as f64 } else { 0.0 }; if usage > avg_usage * 1.2 || usage < avg_usage * 0.8 { needs_rebalance = true; break; } } needs_rebalance } }; Ok(protocol::ClusterStatus { nodes: cluster_nodes, total_capacity, total_used, rebalance_needed, cluster_formed: self.cluster_formed.load(Ordering::SeqCst), leader_exists: self.is_leader.load(Ordering::SeqCst), raft_nodes, }) } pub fn get_raft_nodes(&self) -> Vec { self.nodes.iter() .map(|node| node.raft_info.clone()) .collect() } pub fn is_cluster_formed(&self) -> bool { self.cluster_formed.load(Ordering::SeqCst) } pub fn start_election(&self) -> Result<()> { if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( format!("Cluster not formed. Need at least {} nodes.", self.min_nodes_for_cluster) )); } let new_term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1; println!("Starting election for term {}", new_term); self.is_leader.store(false, Ordering::SeqCst); match self.raft_state.compare_exchange(RaftState::Follower, RaftState::Candidate, Ordering::SeqCst) { Ok(_) => { if let Some(mut node) = self.nodes.get_mut(&self.node_id) { node.raft_info.state = RaftState::Candidate; node.raft_info.term = new_term; node.raft_info.voted_for = Some(self.node_id.clone()); } self.replication_queue.push(ReplicationEvent::RaftVoteRequest { term: new_term, candidate_id: self.node_id.clone(), }); } Err(current_state) => { println!("Already in state {:?}, cannot start election", current_state); } } Ok(()) } pub async fn replicate(&self, command: protocol::Command) -> Result<()> { if !self.replication_enabled.load(Ordering::SeqCst) { return Ok(()); } if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( "Cannot replicate: cluster not formed".to_string() )); } self.replication_queue.push(ReplicationEvent::Command(command)); Ok(()) } pub async fn request_sync(&self) -> Result<()> { if !self.replication_enabled.load(Ordering::SeqCst) { return Ok(()); } self.replication_queue.push(ReplicationEvent::SyncRequest); Ok(()) } pub fn get_nodes(&self) -> Vec { self.nodes.iter() .map(|node| node.clone()) .collect() } pub fn get_sequence_number(&self) -> u64 { self.sequence_number.load(Ordering::SeqCst) } pub fn is_replication_enabled(&self) -> bool { self.replication_enabled.load(Ordering::SeqCst) } pub fn get_node(&self, node_id: &str) -> Option { self.nodes.get(node_id).map(|entry| entry.clone()) } pub fn get_node_id(&self) -> &str { &self.node_id } }