From 5dbe2af865cb3bd365e184858830a11de802f4a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Sat, 6 Dec 2025 15:56:26 +0000 Subject: [PATCH] Delete src/server/sharding.rs --- src/server/sharding.rs | 978 ----------------------------------------- 1 file changed, 978 deletions(-) delete mode 100644 src/server/sharding.rs diff --git a/src/server/sharding.rs b/src/server/sharding.rs deleted file mode 100644 index 8f16659..0000000 --- a/src/server/sharding.rs +++ /dev/null @@ -1,978 +0,0 @@ -// src/server/sharding.rs -//! Модуль шардинга с консистентным хэшированием и Raft протоколом -//! -//! Объединяет функционал шардинга и репликации с lock-free архитектурой -//! и реализацией Raft консенсуса для работы в production. - -use std::collections::{HashMap, BTreeMap}; -use std::hash::{Hash, Hasher}; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; -use tokio::sync::mpsc; -use tokio::time::{interval, Duration}; -use tokio::io::AsyncWriteExt; -use serde::{Serialize, Deserialize}; -use siphasher::sip::SipHasher13; -use crossbeam::queue::SegQueue; -use crossbeam::epoch::{self, Atomic, Owned, Guard}; - -use crate::common::Result; -use crate::common::protocol; - -/// Состояния узла в Raft протоколе -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub enum RaftState { - Follower, - Candidate, - Leader, -} - -/// Atomic Raft состояние -struct AtomicRaftState { - inner: AtomicU64, // 0=Follower, 1=Candidate, 2=Leader -} - -impl AtomicRaftState { - fn new() -> Self { - Self { - inner: AtomicU64::new(0), - } - } - - fn get(&self) -> RaftState { - match self.inner.load(Ordering::Acquire) { - 0 => RaftState::Follower, - 1 => RaftState::Candidate, - 2 => RaftState::Leader, - _ => RaftState::Follower, - } - } - - fn set(&self, state: RaftState) { - let value = match state { - RaftState::Follower => 0, - RaftState::Candidate => 1, - RaftState::Leader => 2, - }; - self.inner.store(value, Ordering::Release); - } - - fn compare_exchange(&self, current: RaftState, new: RaftState, order: Ordering) -> std::result::Result { - let current_val = match current { - RaftState::Follower => 0, - RaftState::Candidate => 1, - RaftState::Leader => 2, - }; - - let new_val = match new { - RaftState::Follower => 0, - RaftState::Candidate => 1, - RaftState::Leader => 2, - }; - - match self.inner.compare_exchange(current_val, new_val, order, Ordering::Relaxed) { - Ok(_) => Ok(new), - Err(actual_val) => { - let actual_state = match actual_val { - 0 => RaftState::Follower, - 1 => RaftState::Candidate, - 2 => RaftState::Leader, - _ => RaftState::Follower, - }; - Err(actual_state) - } - } - } -} - -/// Информация о Raft узле с atomic операциями -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RaftNode { - pub node_id: String, - pub address: String, - pub state: RaftState, - pub term: u64, - pub voted_for: Option, - pub last_heartbeat: i64, -} - -/// Lock-Free хэш-таблица для узлов -#[derive(Clone)] -struct LockFreeNodeMap { - inner: Atomic>, -} - -impl LockFreeNodeMap { - fn new() -> Self { - Self { - inner: Atomic::new(HashMap::new()), - } - } - - fn insert(&self, key: String, value: ShardNode, guard: &Guard) -> Option { - loop { - let current = self.inner.load(Ordering::Acquire, guard); - let mut new_map = HashMap::new(); - - if let Some(ref map) = unsafe { current.as_ref() } { - new_map.extend(map.iter().map(|(k, v)| (k.clone(), v.clone()))); - } - - let old_value = new_map.insert(key.clone(), value.clone()); - - let new_ptr = Owned::new(new_map); - if self.inner.compare_exchange(current, new_ptr, Ordering::Release, Ordering::Relaxed, guard).is_ok() { - return old_value; - } - } - } - - fn get(&self, key: &str, guard: &Guard) -> Option { - let current = self.inner.load(Ordering::Acquire, guard); - if let Some(map) = unsafe { current.as_ref() } { - map.get(key).cloned() - } else { - None - } - } - - fn remove(&self, key: &str, guard: &Guard) -> Option { - loop { - let current = self.inner.load(Ordering::Acquire, guard); - if let Some(map) = unsafe { current.as_ref() } { - let mut new_map = HashMap::new(); - new_map.extend(map.iter().filter(|(k, _)| **k != key).map(|(k, v)| (k.clone(), v.clone()))); - - let new_ptr = Owned::new(new_map); - if self.inner.compare_exchange(current, new_ptr, Ordering::Release, Ordering::Relaxed, guard).is_ok() { - return map.get(key).cloned(); - } - } else { - return None; - } - } - } - - fn len(&self, guard: &Guard) -> usize { - let current = self.inner.load(Ordering::Acquire, guard); - if let Some(map) = unsafe { current.as_ref() } { - map.len() - } else { - 0 - } - } - - fn iter<'a>(&'a self, guard: &'a Guard) -> Vec<(&'a String, &'a ShardNode)> { - let current = self.inner.load(Ordering::Acquire, guard); - if let Some(map) = unsafe { current.as_ref() } { - map.iter().collect() - } else { - Vec::new() - } - } -} - -/// Информация о шард-узле с Raft -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ShardNode { - pub node_id: String, - pub address: String, - pub capacity: u64, - pub used: u64, - pub collections: Vec, - pub raft_info: RaftNode, -} - -/// Состояние шардинга для коллекции -#[derive(Debug, Clone)] -pub struct CollectionSharding { - pub shard_key: String, - pub virtual_nodes: usize, - pub ring: BTreeMap, -} - -/// События репликации -#[derive(Debug, Serialize, Deserialize)] -pub enum ReplicationEvent { - Command(protocol::Command), - SyncRequest, - Heartbeat, - RaftVoteRequest { term: u64, candidate_id: String }, - RaftVoteResponse { term: u64, vote_granted: bool }, - RaftAppendEntries { term: u64, leader_id: String }, -} - -/// Lock-Free очередь репликации -struct LockFreeReplicationQueue { - queue: SegQueue, - size: AtomicUsize, -} - -impl LockFreeReplicationQueue { - fn new() -> Self { - Self { - queue: SegQueue::new(), - size: AtomicUsize::new(0), - } - } - - fn push(&self, event: ReplicationEvent) { - self.queue.push(event); - self.size.fetch_add(1, Ordering::SeqCst); - } - - fn pop(&self) -> Option { - let event = self.queue.pop(); - if event.is_some() { - self.size.fetch_sub(1, Ordering::SeqCst); - } - event - } - - fn len(&self) -> usize { - self.size.load(Ordering::Acquire) - } -} - -/// Менеджер шардинга и репликации с Raft -#[derive(Clone)] -pub struct ShardingManager { - // Шардинг компоненты - nodes: Arc, - collections: Arc>>, - virtual_nodes_per_node: usize, - min_nodes_for_cluster: usize, - - // Raft компоненты с atomic операциями - current_term: Arc, - voted_for: Arc>>, - is_leader: Arc, - raft_state: Arc, - cluster_formed: Arc, - - // Репликация компоненты - replication_queue: Arc, - sequence_number: Arc, - replication_enabled: Arc, - node_id: String, -} - -impl ShardingManager { - pub fn new( - virtual_nodes_per_node: usize, - replication_enabled: bool, - min_nodes_for_cluster: usize, - node_id: String - ) -> Self { - let manager = Self { - nodes: Arc::new(LockFreeNodeMap::new()), - collections: Arc::new(Atomic::new(HashMap::new())), - virtual_nodes_per_node, - min_nodes_for_cluster, - current_term: Arc::new(AtomicU64::new(0)), - voted_for: Arc::new(Atomic::new(HashMap::new())), - is_leader: Arc::new(AtomicBool::new(false)), - raft_state: Arc::new(AtomicRaftState::new()), - cluster_formed: Arc::new(AtomicBool::new(false)), - replication_queue: Arc::new(LockFreeReplicationQueue::new()), - sequence_number: Arc::new(AtomicU64::new(0)), - replication_enabled: Arc::new(AtomicBool::new(replication_enabled)), - node_id, - }; - - let _ = manager.add_node( - manager.node_id.clone(), - "127.0.0.1:8081".to_string(), - 1024 * 1024 * 1024 - ); - - let manager_clone = manager.clone(); - tokio::spawn(async move { - manager_clone.run_replication_loop().await; - }); - - manager - } - - async fn run_replication_loop(self) { - let mut heartbeat_interval = interval(Duration::from_millis(1000)); - let mut election_timeout = interval(Duration::from_millis(5000)); - - loop { - tokio::select! { - _ = heartbeat_interval.tick() => { - if self.is_leader.load(Ordering::SeqCst) && - self.replication_enabled.load(Ordering::SeqCst) && - self.cluster_formed.load(Ordering::SeqCst) { - let _ = self.send_heartbeat().await; - } - } - _ = election_timeout.tick() => { - if !self.is_leader.load(Ordering::SeqCst) && - self.replication_enabled.load(Ordering::SeqCst) && - self.cluster_formed.load(Ordering::SeqCst) { - let _ = self.start_election(); - } - } - _ = tokio::time::sleep(Duration::from_millis(10)) => { - while let Some(event) = self.replication_queue.pop() { - self.handle_replication_event(event).await; - } - } - } - } - } - - async fn handle_replication_event(&self, event: ReplicationEvent) { - if !self.replication_enabled.load(Ordering::SeqCst) { - return; - } - - match event { - ReplicationEvent::Command(cmd) => { - self.replicate_command(cmd).await; - } - ReplicationEvent::SyncRequest => { - self.sync_with_nodes().await; - } - ReplicationEvent::Heartbeat => { - let _ = self.send_heartbeat().await; - } - ReplicationEvent::RaftVoteRequest { term, candidate_id } => { - self.handle_vote_request(term, candidate_id).await; - } - ReplicationEvent::RaftVoteResponse { term, vote_granted } => { - self.handle_vote_response(term, vote_granted).await; - } - ReplicationEvent::RaftAppendEntries { term, leader_id } => { - self.handle_append_entries(term, leader_id).await; - } - } - } - - async fn replicate_command(&self, command: protocol::Command) { - if !self.cluster_formed.load(Ordering::SeqCst) { - return; - } - - let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst); - - let guard = epoch::pin(); - let nodes: Vec<_> = self.nodes.iter(&guard) - .into_iter() // ИСПРАВЛЕНО: Добавлен вызов into_iter() - .map(|(_, v)| v.clone()) - .collect(); - - for node in nodes { - if self.is_leader.load(Ordering::SeqCst) && node.raft_info.node_id == self.node_id { - continue; - } - - let node_addr = node.address.clone(); - let cmd_clone = command.clone(); - let seq_clone = sequence; - - tokio::spawn(async move { - if let Err(e) = Self::send_command_to_node(&node_addr, &cmd_clone, seq_clone).await { - eprintln!("Failed to replicate to {}: {}", node_addr, e); - } - }); - } - } - - async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> { - let mut stream = match tokio::net::TcpStream::connect(node).await { - Ok(stream) => stream, - Err(e) => { - eprintln!("Failed to connect to {}: {}", node, e); - return Ok(()); - } - }; - - let message = protocol::ReplicationMessage { - sequence, - command: command.clone(), - timestamp: chrono::Utc::now().timestamp(), - }; - - let bytes = protocol::serialize(&message)?; - - if let Err(e) = stream.write_all(&bytes).await { - eprintln!("Failed to send command to {}: {}", node, e); - } - - Ok(()) - } - - async fn sync_with_nodes(&self) { - if !self.cluster_formed.load(Ordering::SeqCst) { - return; - } - - let guard = epoch::pin(); - let node_count = self.nodes.len(&guard); - println!("Starting sync with {} nodes", node_count); - - let nodes: Vec = self.nodes.iter(&guard) // ИСПРАВЛЕНО: Явно указан тип String - .into_iter() // ИСПРАВЛЕНО: Добавлен вызов into_iter() - .map(|(_, v)| v.address.clone()) - .collect(); - - for node_addr in nodes { - tokio::spawn(async move { - if let Err(e) = Self::sync_with_node(&node_addr).await { - eprintln!("Failed to sync with {}: {}", node_addr, e); - } - }); - } - } - - async fn sync_with_node(_node: &str) -> Result<()> { - Ok(()) - } - - async fn send_heartbeat(&self) -> Result<()> { - if !self.cluster_formed.load(Ordering::SeqCst) { - return Ok(()); - } - - let guard = epoch::pin(); - let nodes: Vec<_> = self.nodes.iter(&guard) - .into_iter() // ИСПРАВЛЕНО: Добавлен вызов into_iter() - .map(|(_, v)| v.clone()) - .collect(); - - for node in nodes { - if node.raft_info.state == RaftState::Follower && node.raft_info.node_id != self.node_id { - let node_addr = node.address.clone(); - tokio::spawn(async move { - if let Err(e) = Self::send_heartbeat_to_node(&node_addr).await { - eprintln!("Heartbeat failed for {}: {}", node_addr, e); - } - }); - } - } - Ok(()) - } - - async fn send_heartbeat_to_node(node: &str) -> Result<()> { - let mut stream = match tokio::net::TcpStream::connect(node).await { - Ok(stream) => stream, - Err(e) => { - eprintln!("Failed to connect to {} for heartbeat: {}", node, e); - return Ok(()); - } - }; - - let heartbeat = protocol::ReplicationMessage { - sequence: 0, - command: protocol::Command::CallProcedure { name: "heartbeat".to_string() }, - timestamp: chrono::Utc::now().timestamp(), - }; - - let bytes = protocol::serialize(&heartbeat)?; - - if let Err(e) = stream.write_all(&bytes).await { - eprintln!("Failed to send heartbeat to {}: {}", node, e); - } - - Ok(()) - } - - async fn handle_vote_request(&self, term: u64, candidate_id: String) { - let current_term = self.current_term.load(Ordering::SeqCst); - - if term > current_term { - self.current_term.store(term, Ordering::SeqCst); - - let guard = epoch::pin(); - let current = self.voted_for.load(Ordering::Acquire, &guard); - let mut new_map = HashMap::new(); - - if let Some(ref map) = unsafe { current.as_ref() } { - new_map.extend(map.iter().map(|(k, v)| (*k, v.clone()))); - } - - new_map.insert(term, candidate_id.clone()); - - let new_ptr = Owned::new(new_map); - self.voted_for.store(new_ptr, Ordering::Release); - } - } - - async fn handle_vote_response(&self, term: u64, vote_granted: bool) { - if vote_granted && term == self.current_term.load(Ordering::SeqCst) { - // Подсчет голосов и переход в лидеры - let guard = epoch::pin(); - let node_count = self.nodes.len(&guard); - - if node_count >= self.min_nodes_for_cluster { - // При получении большинства голосов становимся лидером - match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Leader, Ordering::SeqCst) { - Ok(_) => { - self.is_leader.store(true, Ordering::SeqCst); - println!("Elected as leader for term {}", term); - } - Err(_) => { - // Кто-то уже стал лидером - } - } - } - } - } - - async fn handle_append_entries(&self, term: u64, leader_id: String) { - let current_term = self.current_term.load(Ordering::SeqCst); - - if term >= current_term { - self.current_term.store(term, Ordering::SeqCst); - - // Atomic изменение состояния - match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Follower, Ordering::SeqCst) { - Ok(_) => { - self.is_leader.store(false, Ordering::SeqCst); - - let guard = epoch::pin(); - if let Some(mut node) = self.nodes.get(&self.node_id, &guard) { - node.raft_info.state = RaftState::Follower; - node.raft_info.term = term; - node.raft_info.last_heartbeat = chrono::Utc::now().timestamp(); - - self.nodes.insert(self.node_id.clone(), node, &guard); - } - } - Err(_) => { - // Уже в другом состоянии - } - } - } - } - - pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> { - let raft_node = RaftNode { - node_id: node_id.clone(), - address: address.clone(), - state: RaftState::Follower, - term: 0, - voted_for: None, - last_heartbeat: chrono::Utc::now().timestamp(), - }; - - let node = ShardNode { - node_id: node_id.clone(), - address, - capacity, - used: 0, - collections: Vec::new(), - raft_info: raft_node, - }; - - let guard = epoch::pin(); - self.nodes.insert(node_id, node, &guard); - - let node_count = self.nodes.len(&guard); - if node_count >= self.min_nodes_for_cluster { - self.cluster_formed.store(true, Ordering::SeqCst); - println!("Cluster formed with {} nodes (minimum required: {})", - node_count, self.min_nodes_for_cluster); - } - - Ok(()) - } - - pub fn remove_node(&self, node_id: &str) -> Result<()> { - let guard = epoch::pin(); - self.nodes.remove(node_id, &guard); - - let node_count = self.nodes.len(&guard); - if node_count < self.min_nodes_for_cluster { - self.cluster_formed.store(false, Ordering::SeqCst); - self.is_leader.store(false, Ordering::SeqCst); - println!("Cluster no longer formed. Have {} nodes (need {})", - node_count, self.min_nodes_for_cluster); - } - - Ok(()) - } - - pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> { - if !self.cluster_formed.load(Ordering::SeqCst) { - return Err(crate::common::FutriixError::ShardingError( - format!("Cannot setup sharding: cluster not formed. Need at least {} nodes.", - self.min_nodes_for_cluster) - )); - } - - let sharding = CollectionSharding { - shard_key: shard_key.to_string(), - virtual_nodes: self.virtual_nodes_per_node, - ring: BTreeMap::new(), - }; - - let guard = epoch::pin(); - let current = self.collections.load(Ordering::Acquire, &guard); - let mut new_map = HashMap::new(); - - if let Some(ref map) = unsafe { current.as_ref() } { - new_map.extend(map.iter().map(|(k, v)| (k.clone(), v.clone()))); - } - - new_map.insert(collection.to_string(), sharding); - - let new_ptr = Owned::new(new_map); - self.collections.store(new_ptr, Ordering::Release); - - self.rebuild_ring(collection)?; - Ok(()) - } - - fn rebuild_ring(&self, collection: &str) -> Result<()> { - let guard = epoch::pin(); - let collections_current = self.collections.load(Ordering::Acquire, &guard); - - if let Some(collections_map) = unsafe { collections_current.as_ref() } { - if let Some(mut sharding) = collections_map.get(collection).cloned() { - sharding.ring.clear(); - - let nodes: Vec = self.nodes.iter(&guard) // ИСПРАВЛЕНО: Явно указан тип String - .into_iter() // ИСПРАВЛЕНО: Добавлен вызов into_iter() - .map(|(k, _)| k.clone()) - .collect(); - - for node_id in nodes { - for i in 0..sharding.virtual_nodes { - let key = format!("{}-{}", node_id, i); - let hash = self.hash_key(&key); - sharding.ring.insert(hash, node_id.clone()); - } - } - - // Обновляем коллекцию - let mut new_map = HashMap::new(); - new_map.extend(collections_map.iter().map(|(k, v)| { - if k == collection { - (k.clone(), sharding.clone()) - } else { - (k.clone(), v.clone()) - } - })); - - let new_ptr = Owned::new(new_map); - self.collections.store(new_ptr, Ordering::Release); - } - } - - Ok(()) - } - - fn hash_key(&self, key: &str) -> u64 { - let mut hasher = SipHasher13::new(); - key.hash(&mut hasher); - hasher.finish() - } - - pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result> { - if !self.cluster_formed.load(Ordering::SeqCst) { - return Err(crate::common::FutriixError::ShardingError( - format!("Cannot find node: cluster not formed. Need at least {} nodes.", - self.min_nodes_for_cluster) - )); - } - - let guard = epoch::pin(); - let collections_current = self.collections.load(Ordering::Acquire, &guard); - - if let Some(collections_map) = unsafe { collections_current.as_ref() } { - if let Some(sharding) = collections_map.get(collection) { - let key_hash = self.hash_key(key_value); - - let mut range = sharding.ring.range(key_hash..); - if let Some((_, node_id)) = range.next() { - return Ok(Some(node_id.clone())); - } - - if let Some((_, node_id)) = sharding.ring.iter().next() { - return Ok(Some(node_id.clone())); - } - } - } - - Ok(None) - } - - pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> { - if !self.cluster_formed.load(Ordering::SeqCst) { - return Err(crate::common::FutriixError::ShardingError( - format!("Cannot migrate shard: cluster not formed. Need at least {} nodes.", - self.min_nodes_for_cluster) - )); - } - - let guard = epoch::pin(); - - if self.nodes.get(from_node, &guard).is_none() { - return Err(crate::common::FutriixError::ShardingError( - format!("Source node '{}' not found in cluster", from_node) - )); - } - - if self.nodes.get(to_node, &guard).is_none() { - return Err(crate::common::FutriixError::ShardingError( - format!("Destination node '{}' not found in cluster", to_node) - )); - } - - println!("Migrating shard for collection '{}' from {} to {} with key {}", - collection, from_node, to_node, shard_key); - - self.rebuild_ring(collection)?; - Ok(()) - } - - pub fn rebalance_cluster(&self) -> Result<()> { - if !self.cluster_formed.load(Ordering::SeqCst) { - return Err(crate::common::FutriixError::ShardingError( - format!("Cannot rebalance cluster: cluster not formed. Need at least {} nodes.", - self.min_nodes_for_cluster) - )); - } - - let guard = epoch::pin(); - let node_count = self.nodes.len(&guard); - println!("Rebalancing cluster with {} nodes", node_count); - - let collections_current = self.collections.load(Ordering::Acquire, &guard); - let mut new_collections = HashMap::new(); - - if let Some(collections_map) = unsafe { collections_current.as_ref() } { - for (name, sharding) in collections_map { - let mut new_sharding = sharding.clone(); - new_sharding.ring.clear(); - - let nodes: Vec = self.nodes.iter(&guard) // ИСПРАВЛЕНО: Явно указан тип String - .into_iter() // ИСПРАВЛЕНО: Добавлен вызов into_iter() - .map(|(k, _)| k.clone()) - .collect(); - - for node_id in nodes { - for i in 0..new_sharding.virtual_nodes { - let key = format!("{}-{}", node_id, i); - let hash = self.hash_key(&key); - new_sharding.ring.insert(hash, node_id.clone()); - } - } - - new_collections.insert(name.clone(), new_sharding); - } - } - - let new_ptr = Owned::new(new_collections); - self.collections.store(new_ptr, Ordering::Release); - - self.rebalance_nodes(&guard)?; - - Ok(()) - } - - fn rebalance_nodes(&self, guard: &Guard) -> Result<()> { - println!("Rebalancing nodes in cluster..."); - - let mut total_capacity = 0; - let mut total_used = 0; - let mut nodes_info = Vec::new(); - - for (_, node) in self.nodes.iter(guard) { - total_capacity += node.capacity; - total_used += node.used; - nodes_info.push((node.node_id.clone(), node.used, node.capacity)); - } - - let avg_usage = if total_capacity > 0 { total_used as f64 / total_capacity as f64 } else { 0.0 }; - - println!("Cluster usage: {:.2}% ({} / {})", avg_usage * 100.0, total_used, total_capacity); - - let mut overloaded_nodes = Vec::new(); - let mut underloaded_nodes = Vec::new(); - - for (node_id, used, capacity) in nodes_info { - let usage = if capacity > 0 { used as f64 / capacity as f64 } else { 0.0 }; - - if usage > avg_usage * 1.2 { - overloaded_nodes.push((node_id, usage)); - } else if usage < avg_usage * 0.8 { - underloaded_nodes.push((node_id, usage)); - } - } - - println!("Overloaded nodes: {}", overloaded_nodes.len()); - println!("Underloaded nodes: {}", underloaded_nodes.len()); - - Ok(()) - } - - pub fn get_cluster_status(&self) -> Result { - let guard = epoch::pin(); - - let mut cluster_nodes = Vec::new(); - let mut total_capacity = 0; - let mut total_used = 0; - let mut raft_nodes = Vec::new(); - - for (_, node) in self.nodes.iter(&guard) { - total_capacity += node.capacity; - total_used += node.used; - - cluster_nodes.push(protocol::ShardInfo { - node_id: node.node_id.clone(), - address: node.address.clone(), - capacity: node.capacity, - used: node.used, - collections: node.collections.clone(), - }); - - raft_nodes.push(protocol::RaftNodeInfo { - node_id: node.node_id.clone(), - address: node.address.clone(), - state: match node.raft_info.state { - RaftState::Leader => "leader".to_string(), - RaftState::Follower => "follower".to_string(), - RaftState::Candidate => "candidate".to_string(), - }, - term: node.raft_info.term, - last_heartbeat: node.raft_info.last_heartbeat, - }); - } - - let rebalance_needed = { - if total_capacity == 0 { - false - } else { - let avg_usage = total_used as f64 / total_capacity as f64; - let mut needs_rebalance = false; - - for (_, node) in self.nodes.iter(&guard) { - let usage = if node.capacity > 0 { - node.used as f64 / node.capacity as f64 - } else { - 0.0 - }; - - if usage > avg_usage * 1.2 || usage < avg_usage * 0.8 { - needs_rebalance = true; - break; - } - } - - needs_rebalance - } - }; - - Ok(protocol::ClusterStatus { - nodes: cluster_nodes, - total_capacity, - total_used, - rebalance_needed, - cluster_formed: self.cluster_formed.load(Ordering::SeqCst), - leader_exists: self.is_leader.load(Ordering::SeqCst), - raft_nodes, - }) - } - - pub fn get_raft_nodes(&self) -> Vec { - let guard = epoch::pin(); - self.nodes.iter(&guard) - .into_iter() // ИСПРАВЛЕНО: Добавлен вызов into_iter() - .map(|(_, node)| node.raft_info.clone()) - .collect() - } - - pub fn is_cluster_formed(&self) -> bool { - self.cluster_formed.load(Ordering::SeqCst) - } - - pub fn start_election(&self) -> Result<()> { - if !self.cluster_formed.load(Ordering::SeqCst) { - return Err(crate::common::FutriixError::ShardingError( - format!("Cluster not formed. Need at least {} nodes.", self.min_nodes_for_cluster) - )); - } - - let new_term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1; - println!("Starting election for term {}", new_term); - - self.is_leader.store(false, Ordering::SeqCst); - - // Atomic переход в состояние candidate - match self.raft_state.compare_exchange(RaftState::Follower, RaftState::Candidate, Ordering::SeqCst) { - Ok(_) => { - let guard = epoch::pin(); - if let Some(mut node) = self.nodes.get(&self.node_id, &guard) { - node.raft_info.state = RaftState::Candidate; - node.raft_info.term = new_term; - node.raft_info.voted_for = Some(self.node_id.clone()); - - self.nodes.insert(self.node_id.clone(), node, &guard); - } - - self.replication_queue.push(ReplicationEvent::RaftVoteRequest { - term: new_term, - candidate_id: self.node_id.clone(), - }); - } - Err(current_state) => { - println!("Already in state {:?}, cannot start election", current_state); - } - } - - Ok(()) - } - - pub async fn replicate(&self, command: protocol::Command) -> Result<()> { - if !self.replication_enabled.load(Ordering::SeqCst) { - return Ok(()); - } - - if !self.cluster_formed.load(Ordering::SeqCst) { - return Err(crate::common::FutriixError::ShardingError( - "Cannot replicate: cluster not formed".to_string() - )); - } - - self.replication_queue.push(ReplicationEvent::Command(command)); - Ok(()) - } - - pub async fn request_sync(&self) -> Result<()> { - if !self.replication_enabled.load(Ordering::SeqCst) { - return Ok(()); - } - - self.replication_queue.push(ReplicationEvent::SyncRequest); - Ok(()) - } - - pub fn get_nodes(&self) -> Vec { - let guard = epoch::pin(); - self.nodes.iter(&guard) - .into_iter() // ИСПРАВЛЕНО: Добавлен вызов into_iter() - .map(|(_, node)| node.clone()) - .collect() - } - - pub fn get_sequence_number(&self) -> u64 { - self.sequence_number.load(Ordering::SeqCst) - } - - pub fn is_replication_enabled(&self) -> bool { - self.replication_enabled.load(Ordering::SeqCst) - } - - pub fn get_node(&self, node_id: &str) -> Option { - let guard = epoch::pin(); - self.nodes.get(node_id, &guard) - } - - pub fn get_node_id(&self) -> &str { - &self.node_id - } -}