diff --git a/src/server/sharding.rs b/src/server/sharding.rs index 55895a3..a0c1577 100644 --- a/src/server/sharding.rs +++ b/src/server/sharding.rs @@ -32,32 +32,36 @@ use crate::common::protocol; /// Состояния узла в Raft протоколе #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum RaftState { - Follower, - Candidate, - Leader, + Follower, // Подчиненный узел, получает команды от лидера + Candidate, // Кандидат на выборах лидера + Leader, // Лидер, координирует операции репликации } /// Atomic Raft состояние для атомарных операций +/// Обеспечивает thread-safe доступ к состоянию Raft узла struct AtomicRaftState { - inner: AtomicU64, + inner: AtomicU64, // Внутреннее атомарное представление состояния } impl AtomicRaftState { + /// Создает новое атомарное состояние Raft fn new() -> Self { Self { - inner: AtomicU64::new(0), + inner: AtomicU64::new(0), // Начальное состояние: Follower } } + /// Получает текущее состояние Raft fn get(&self) -> RaftState { match self.inner.load(Ordering::Acquire) { 0 => RaftState::Follower, 1 => RaftState::Candidate, 2 => RaftState::Leader, - _ => RaftState::Follower, + _ => RaftState::Follower, // Значение по умолчанию при ошибке } } + /// Устанавливает новое состояние Raft fn set(&self, state: RaftState) { let value = match state { RaftState::Follower => 0, @@ -67,6 +71,8 @@ impl AtomicRaftState { self.inner.store(value, Ordering::Release); } + /// Атомарное сравнение и обмен состояния Raft + /// Возвращает Ok(new) если операция успешна, иначе Err(actual_state) fn compare_exchange(&self, current: RaftState, new: RaftState, order: Ordering) -> std::result::Result { let current_val = match current { RaftState::Follower => 0, @@ -98,52 +104,53 @@ impl AtomicRaftState { /// Информация о Raft узле #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RaftNode { - pub node_id: String, - pub address: String, - pub state: RaftState, - pub term: u64, - pub voted_for: Option, - pub last_heartbeat: i64, + pub node_id: String, // Уникальный идентификатор узла + pub address: String, // Сетевой адрес узла + pub state: RaftState, // Текущее состояние узла + pub term: u64, // Текущий термин Raft + pub voted_for: Option, // За кого голосовал узел в текущем терме + pub last_heartbeat: i64, // Время последнего heartbeat (timestamp) } /// Информация о шард-узле с Raft #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ShardNode { - pub node_id: String, - pub address: String, - pub capacity: u64, - pub used: u64, - pub collections: Vec, - pub raft_info: RaftNode, + pub node_id: String, // Уникальный идентификатор узла + pub address: String, // Сетевой адрес узла + pub capacity: u64, // Общая емкость узла в байтах + pub used: u64, // Использованная емкость в байтах + pub collections: Vec, // Коллекции, хранящиеся на узле + pub raft_info: RaftNode, // Информация о состоянии Raft узла } /// Состояние шардинга для коллекции #[derive(Debug, Clone)] pub struct CollectionSharding { - pub shard_key: String, - pub virtual_nodes: usize, - // Используем Arc для совместного доступа из нескольких потоков - pub ring: Arc>, + pub shard_key: String, // Ключ для шардинга + pub virtual_nodes: usize, // Количество виртуальных узлов на физический узел + pub ring: Arc>, // Консистентное хэш-кольцо для распределения данных } -/// События репликации +/// События репликации для обработки в очереди #[derive(Debug, Serialize, Deserialize)] pub enum ReplicationEvent { - Command(protocol::Command), - SyncRequest, - Heartbeat, - RaftVoteRequest { term: u64, candidate_id: String }, - RaftVoteResponse { term: u64, vote_granted: bool }, - RaftAppendEntries { term: u64, leader_id: String }, + Command(protocol::Command), // Команда для репликации + SyncRequest, // Запрос синхронизации + Heartbeat, // Heartbeat сообщение + RaftVoteRequest { term: u64, candidate_id: String }, // Запрос голоса в Raft + RaftVoteResponse { term: u64, vote_granted: bool }, // Ответ на запрос голоса + RaftAppendEntries { term: u64, leader_id: String }, // Сообщение о добавлении записей от лидера } /// Lock-Free очередь репликации на основе SegQueue +/// Обеспечивает безопасный доступ из нескольких потоков без блокировок struct LockFreeReplicationQueue { - queue: SegQueue, - size: AtomicUsize, + queue: SegQueue, // Сегментированная lock-free очередь + size: AtomicUsize, // Атомарный счетчик размера очереди } impl LockFreeReplicationQueue { + /// Создает новую lock-free очередь fn new() -> Self { Self { queue: SegQueue::new(), @@ -151,11 +158,13 @@ impl LockFreeReplicationQueue { } } + /// Добавляет событие в очередь fn push(&self, event: ReplicationEvent) { self.queue.push(event); self.size.fetch_add(1, Ordering::SeqCst); } + /// Извлекает событие из очереди fn pop(&self) -> Option { let event = self.queue.pop(); if event.is_some() { @@ -164,36 +173,38 @@ impl LockFreeReplicationQueue { event } + /// Возвращает текущий размер очереди fn len(&self) -> usize { self.size.load(Ordering::Acquire) } } /// Менеджер шардинга и репликации с Raft +/// Координирует распределение данных по узлам кластера и обеспечивает консенсус #[derive(Clone)] pub struct ShardingManager { // Шардинг компоненты - nodes: Arc>, - // Используем DashMap для атомарного доступа к настройкам шардинга коллекций - collections: Arc>, - virtual_nodes_per_node: usize, - min_nodes_for_cluster: usize, + nodes: Arc>, // Узлы кластера + collections: Arc>, // Настройки шардинга для коллекций + virtual_nodes_per_node: usize, // Виртуальных узлов на физический узел + min_nodes_for_cluster: usize, // Минимальное количество узлов для формирования кластера // Raft компоненты с atomic операциями - current_term: Arc, - voted_for: Arc>, - is_leader: Arc, - raft_state: Arc, - cluster_formed: Arc, + current_term: Arc, // Текущий термин Raft + voted_for: Arc>, // Голоса по терминам + is_leader: Arc, // Флаг лидера + raft_state: Arc, // Текущее состояние Raft + cluster_formed: Arc, // Флаг сформированного кластера // Репликация компоненты - replication_queue: Arc, - sequence_number: Arc, - replication_enabled: Arc, - node_id: String, + replication_queue: Arc, // Очередь репликации + sequence_number: Arc, // Номер последовательности для упорядочивания + replication_enabled: Arc, // Флаг включенной репликации + node_id: String, // Идентификатор текущего узла } impl ShardingManager { + /// Создает новый менеджер шардинга pub fn new( virtual_nodes_per_node: usize, replication_enabled: bool, @@ -233,6 +244,8 @@ impl ShardingManager { manager } + /// Основной цикл репликации и выборов Raft + /// Обрабатывает heartbeat, таймауты выборов и события из очереди async fn run_replication_loop(self) { let mut heartbeat_interval = interval(Duration::from_millis(1000)); let mut election_timeout = interval(Duration::from_millis(5000)); @@ -240,6 +253,7 @@ impl ShardingManager { loop { tokio::select! { _ = heartbeat_interval.tick() => { + // Отправка heartbeat если текущий узел - лидер if self.is_leader.load(Ordering::SeqCst) && self.replication_enabled.load(Ordering::SeqCst) && self.cluster_formed.load(Ordering::SeqCst) { @@ -247,6 +261,7 @@ impl ShardingManager { } } _ = election_timeout.tick() => { + // Запуск выборов если текущий узел не лидер if !self.is_leader.load(Ordering::SeqCst) && self.replication_enabled.load(Ordering::SeqCst) && self.cluster_formed.load(Ordering::SeqCst) { @@ -254,6 +269,7 @@ impl ShardingManager { } } _ = tokio::time::sleep(Duration::from_millis(10)) => { + // Обработка событий из очереди репликации while let Some(event) = self.replication_queue.pop() { self.handle_replication_event(event).await; } @@ -262,6 +278,7 @@ impl ShardingManager { } } + /// Обработка события репликации async fn handle_replication_event(&self, event: ReplicationEvent) { if !self.replication_enabled.load(Ordering::SeqCst) { return; @@ -289,17 +306,21 @@ impl ShardingManager { } } + /// Репликация команды на все узлы кластера async fn replicate_command(&self, command: protocol::Command) { if !self.cluster_formed.load(Ordering::SeqCst) { return; } + // Генерируем уникальный номер последовательности let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst); + // Получаем список всех узлов let nodes: Vec = self.nodes.iter() .map(|entry| entry.value().clone()) .collect(); + // Отправляем команду на все узлы кроме себя (если лидер) for node in nodes { if self.is_leader.load(Ordering::SeqCst) && node.raft_info.node_id == self.node_id { continue; @@ -309,6 +330,7 @@ impl ShardingManager { let cmd_clone = command.clone(); let seq_clone = sequence; + // Асинхронная отправка на каждый узел tokio::spawn(async move { if let Err(e) = Self::send_command_to_node(&node_addr, &cmd_clone, seq_clone).await { eprintln!("Failed to replicate to {}: {}", node_addr, e); @@ -317,9 +339,11 @@ impl ShardingManager { } } + /// Отправка команды на конкретный узел async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> { use crate::common::protocol::{ReplicationMessage, serialize}; + // Устанавливаем соединение с узлом let mut stream = match tokio::net::TcpStream::connect(node).await { Ok(stream) => stream, Err(e) => { @@ -328,12 +352,14 @@ impl ShardingManager { } }; + // Создаем сообщение репликации let message = ReplicationMessage { sequence, command: command.clone(), timestamp: chrono::Utc::now().timestamp(), }; + // Сериализуем сообщение let bytes = match serialize(&message) { Ok(b) => b, Err(e) => { @@ -342,6 +368,7 @@ impl ShardingManager { } }; + // Отправляем данные if let Err(e) = stream.write_all(&bytes).await { eprintln!("Failed to send command to {}: {}", node, e); } @@ -349,6 +376,7 @@ impl ShardingManager { Ok(()) } + /// Синхронизация со всеми узлами кластера async fn sync_with_nodes(&self) { if !self.cluster_formed.load(Ordering::SeqCst) { return; @@ -370,11 +398,14 @@ impl ShardingManager { } } + /// Синхронизация с конкретным узлом async fn sync_with_node(_node: &str) -> Result<()> { // Заглушка для синхронизации + // В реальной реализации здесь будет обмен метаданными и данными Ok(()) } + /// Отправка heartbeat сообщений на все подчиненные узлы async fn send_heartbeat(&self) -> Result<()> { if !self.cluster_formed.load(Ordering::SeqCst) { return Ok(()); @@ -384,6 +415,7 @@ impl ShardingManager { .map(|entry| entry.value().clone()) .collect(); + // Отправляем heartbeat всем подчиненным узлам for node in nodes { if node.raft_info.state == RaftState::Follower && node.raft_info.node_id != self.node_id { let node_addr = node.address.clone(); @@ -397,6 +429,7 @@ impl ShardingManager { Ok(()) } + /// Отправка heartbeat на конкретный узел async fn send_heartbeat_to_node(node: &str) -> Result<()> { use crate::common::protocol::{ReplicationMessage, serialize}; @@ -408,6 +441,7 @@ impl ShardingManager { } }; + // Создаем heartbeat сообщение let heartbeat = ReplicationMessage { sequence: 0, command: protocol::Command::CallProcedure { name: "heartbeat".to_string() }, @@ -429,22 +463,27 @@ impl ShardingManager { Ok(()) } + /// Обработка запроса голоса в Raft async fn handle_vote_request(&self, term: u64, candidate_id: String) { let current_term = self.current_term.load(Ordering::SeqCst); if term > current_term { + // Обновляем термин и голосуем за кандидата self.current_term.store(term, Ordering::SeqCst); self.voted_for.insert(term, candidate_id); } } + /// Обработка ответа на запрос голоса в Raft async fn handle_vote_response(&self, term: u64, vote_granted: bool) { if vote_granted && term == self.current_term.load(Ordering::SeqCst) { let node_count = self.nodes.len(); + // Проверяем, получено ли большинство голосов if node_count >= self.min_nodes_for_cluster { match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Leader, Ordering::SeqCst) { Ok(_) => { + // Становимся лидером self.is_leader.store(true, Ordering::SeqCst); println!("Elected as leader for term {}", term); } @@ -454,16 +493,19 @@ impl ShardingManager { } } + /// Обработка сообщения AppendEntries от лидера async fn handle_append_entries(&self, term: u64, leader_id: String) { let current_term = self.current_term.load(Ordering::SeqCst); if term >= current_term { + // Принимаем лидера и переходим в состояние follower self.current_term.store(term, Ordering::SeqCst); match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Follower, Ordering::SeqCst) { Ok(_) => { self.is_leader.store(false, Ordering::SeqCst); + // Обновляем информацию о состоянии узла if let Some(mut node) = self.nodes.get_mut(&self.node_id) { node.raft_info.state = RaftState::Follower; node.raft_info.term = term; @@ -475,7 +517,9 @@ impl ShardingManager { } } + /// Добавление узла в кластер pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> { + // Создаем информацию о Raft узле let raft_node = RaftNode { node_id: node_id.clone(), address: address.clone(), @@ -485,6 +529,7 @@ impl ShardingManager { last_heartbeat: chrono::Utc::now().timestamp(), }; + // Создаем информацию о шард-узле let node = ShardNode { node_id: node_id.clone(), address, @@ -494,8 +539,10 @@ impl ShardingManager { raft_info: raft_node, }; + // Добавляем узел в кластер self.nodes.insert(node_id, node); + // Проверяем, сформирован ли кластер let node_count = self.nodes.len(); if node_count >= self.min_nodes_for_cluster { self.cluster_formed.store(true, Ordering::SeqCst); @@ -506,9 +553,11 @@ impl ShardingManager { Ok(()) } + /// Удаление узла из кластера pub fn remove_node(&self, node_id: &str) -> Result<()> { self.nodes.remove(node_id); + // Проверяем, остался ли кластер сформированным let node_count = self.nodes.len(); if node_count < self.min_nodes_for_cluster { self.cluster_formed.store(false, Ordering::SeqCst); @@ -520,6 +569,7 @@ impl ShardingManager { Ok(()) } + /// Настройка шардинга для коллекции pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> { if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( @@ -528,6 +578,7 @@ impl ShardingManager { )); } + // Создаем настройки шардинга для коллекции let sharding = CollectionSharding { shard_key: shard_key.to_string(), virtual_nodes: self.virtual_nodes_per_node, @@ -536,21 +587,25 @@ impl ShardingManager { self.collections.insert(collection.to_string(), sharding); + // Перестраиваем хэш-кольцо для коллекции self.rebuild_ring(collection)?; Ok(()) } + /// Перестроение хэш-кольца для коллекции fn rebuild_ring(&self, collection: &str) -> Result<()> { if let Some(mut entry) = self.collections.get_mut(collection) { let sharding = entry.value_mut(); - // Очищаем ring + // Очищаем существующее кольцо sharding.ring.clear(); + // Получаем список всех узлов let nodes: Vec = self.nodes.iter() .map(|node_entry| node_entry.key().clone()) .collect(); + // Добавляем виртуальные узлы для каждого физического узла for node_id in nodes { for i in 0..sharding.virtual_nodes { let key = format!("{}-{}", node_id, i); @@ -563,12 +618,14 @@ impl ShardingManager { Ok(()) } + /// Хэширование ключа для консистентного хэширования fn hash_key(&self, key: &str) -> u64 { let mut hasher = SipHasher13::new(); key.hash(&mut hasher); hasher.finish() } + /// Поиск узла для ключа в указанной коллекции pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result> { if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( @@ -578,6 +635,7 @@ impl ShardingManager { } if let Some(sharding) = self.collections.get(collection) { + // Вычисляем хэш ключа let key_hash = self.hash_key(key_value); // Собираем все записи в вектор @@ -595,7 +653,7 @@ impl ShardingManager { } } - // Если не нашли, возвращаем первый узел + // Если не нашли, возвращаем первый узел (циклический переход) if let Some((_, node_id)) = entries.first() { return Ok(Some(node_id.clone())); } @@ -604,6 +662,7 @@ impl ShardingManager { Ok(None) } + /// Миграция шарда с одного узла на другой pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> { if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( @@ -612,12 +671,14 @@ impl ShardingManager { )); } + // Проверяем существование исходного узла if !self.nodes.contains_key(from_node) { return Err(crate::common::FutriixError::ShardingError( format!("Source node '{}' not found in cluster", from_node) )); } + // Проверяем существование целевого узла if !self.nodes.contains_key(to_node) { return Err(crate::common::FutriixError::ShardingError( format!("Destination node '{}' not found in cluster", to_node) @@ -627,10 +688,12 @@ impl ShardingManager { println!("Migrating shard for collection '{}' from {} to {} with key {}", collection, from_node, to_node, shard_key); + // Перестраиваем кольцо для отражения миграции self.rebuild_ring(collection)?; Ok(()) } + /// Ребалансировка кластера pub fn rebalance_cluster(&self) -> Result<()> { if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( @@ -642,16 +705,18 @@ impl ShardingManager { let node_count = self.nodes.len(); println!("Rebalancing cluster with {} nodes", node_count); - // Перестраиваем все rings + // Перестраиваем все хэш-кольца for key in self.collections.iter().map(|entry| entry.key().clone()).collect::>() { self.rebuild_ring(&key)?; } + // Ребалансируем узлы self.rebalance_nodes()?; Ok(()) } + /// Ребалансировка нагрузки между узлами fn rebalance_nodes(&self) -> Result<()> { println!("Rebalancing nodes in cluster..."); @@ -659,25 +724,31 @@ impl ShardingManager { let mut total_used = 0; let mut nodes_info = Vec::new(); + // Собираем статистику по всем узлам for node in self.nodes.iter() { total_capacity += node.capacity; total_used += node.used; nodes_info.push((node.node_id.clone(), node.used, node.capacity)); } + // Вычисляем среднюю загрузку let avg_usage = if total_capacity > 0 { total_used as f64 / total_capacity as f64 } else { 0.0 }; println!("Cluster usage: {:.2}% ({} / {})", avg_usage * 100.0, total_used, total_capacity); + // Определяем перегруженные и недогруженные узлы let mut overloaded_nodes = Vec::new(); let mut underloaded_nodes = Vec::new(); for (node_id, used, capacity) in nodes_info { let usage = if capacity > 0 { used as f64 / capacity as f64 } else { 0.0 }; + // Узлы с загрузкой > 120% от средней считаем перегруженными if usage > avg_usage * 1.2 { overloaded_nodes.push((node_id, usage)); - } else if usage < avg_usage * 0.8 { + } + // Узлы с загрузкой < 80% от средней считаем недогруженными + else if usage < avg_usage * 0.8 { underloaded_nodes.push((node_id, usage)); } } @@ -685,15 +756,20 @@ impl ShardingManager { println!("Overloaded nodes: {}", overloaded_nodes.len()); println!("Underloaded nodes: {}", underloaded_nodes.len()); + // В реальной реализации здесь была бы логика миграции данных + // между перегруженными и недогруженными узлами + Ok(()) } + /// Получение статуса кластера pub fn get_cluster_status(&self) -> Result { let mut cluster_nodes = Vec::new(); let mut total_capacity = 0; let mut total_used = 0; let mut raft_nodes = Vec::new(); + // Собираем информацию о всех узлах for node in self.nodes.iter() { total_capacity += node.capacity; total_used += node.used; @@ -719,6 +795,7 @@ impl ShardingManager { }); } + // Определяем, нужна ли ребалансировка let rebalance_needed = { if total_capacity == 0 { false @@ -733,6 +810,8 @@ impl ShardingManager { 0.0 }; + // Помечаем кластер как нуждающийся в ребалансировке, + // если есть узлы с загрузкой > 120% или < 80% от средней if usage > avg_usage * 1.2 || usage < avg_usage * 0.8 { needs_rebalance = true; break; @@ -743,6 +822,7 @@ impl ShardingManager { } }; + // Формируем статус кластера Ok(protocol::ClusterStatus { nodes: cluster_nodes, total_capacity, @@ -754,16 +834,19 @@ impl ShardingManager { }) } + /// Получение списка Raft узлов pub fn get_raft_nodes(&self) -> Vec { self.nodes.iter() .map(|node| node.raft_info.clone()) .collect() } + /// Проверка, сформирован ли кластер pub fn is_cluster_formed(&self) -> bool { self.cluster_formed.load(Ordering::SeqCst) } + /// Запуск выборов лидера pub fn start_election(&self) -> Result<()> { if !self.cluster_formed.load(Ordering::SeqCst) { return Err(crate::common::FutriixError::ShardingError( @@ -771,19 +854,23 @@ impl ShardingManager { )); } + // Увеличиваем номер термина let new_term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1; println!("Starting election for term {}", new_term); self.is_leader.store(false, Ordering::SeqCst); + // Переходим в состояние кандидата match self.raft_state.compare_exchange(RaftState::Follower, RaftState::Candidate, Ordering::SeqCst) { Ok(_) => { + // Обновляем информацию о текущем узле if let Some(mut node) = self.nodes.get_mut(&self.node_id) { node.raft_info.state = RaftState::Candidate; node.raft_info.term = new_term; node.raft_info.voted_for = Some(self.node_id.clone()); } + // Отправляем запросы на голосование self.replication_queue.push(ReplicationEvent::RaftVoteRequest { term: new_term, candidate_id: self.node_id.clone(), @@ -797,6 +884,7 @@ impl ShardingManager { Ok(()) } + /// Репликация команды pub async fn replicate(&self, command: protocol::Command) -> Result<()> { if !self.replication_enabled.load(Ordering::SeqCst) { return Ok(()); @@ -808,10 +896,12 @@ impl ShardingManager { )); } + // Добавляем команду в очередь репликации self.replication_queue.push(ReplicationEvent::Command(command)); Ok(()) } + /// Запрос синхронизации pub async fn request_sync(&self) -> Result<()> { if !self.replication_enabled.load(Ordering::SeqCst) { return Ok(()); @@ -821,25 +911,30 @@ impl ShardingManager { Ok(()) } + /// Получение списка всех узлов pub fn get_nodes(&self) -> Vec { self.nodes.iter() .map(|node| node.clone()) .collect() } + /// Получение текущего номера последовательности pub fn get_sequence_number(&self) -> u64 { self.sequence_number.load(Ordering::SeqCst) } + /// Проверка, включена ли репликация pub fn is_replication_enabled(&self) -> bool { self.replication_enabled.load(Ordering::SeqCst) } + /// Получение информации об узле по ID pub fn get_node(&self, node_id: &str) -> Option { self.nodes.get(node_id).map(|entry| entry.clone()) } + /// Получение ID текущего узла pub fn get_node_id(&self) -> &str { &self.node_id } -} +} \ No newline at end of file