From 21e97b9195d372c9858152357ffd7b2e9dcb52ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Mon, 1 Dec 2025 18:03:37 +0000 Subject: [PATCH] Fixed heartbeat sending bug --- src/server/sharding.rs | 612 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 612 insertions(+) create mode 100644 src/server/sharding.rs diff --git a/src/server/sharding.rs b/src/server/sharding.rs new file mode 100644 index 0000000..1bcaeaf --- /dev/null +++ b/src/server/sharding.rs @@ -0,0 +1,612 @@ +// src/server/sharding.rs +//! Модуль шардинга с консистентным хэшированием и Raft протоколом +//! +//! Объединяет функционал шардинга и репликации с wait-free архитектурой +//! и реализацией Raft консенсуса для работы в production. + +use std::collections::{HashMap, BTreeMap}; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use tokio::sync::mpsc; +use tokio::time::{interval, Duration}; +use tokio::io::AsyncWriteExt; +use serde::{Serialize, Deserialize}; +use siphasher::sip::SipHasher13; +use dashmap::DashMap; + +use crate::common::Result; +use crate::common::protocol; + +/// Состояния узла в Raft протоколе +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum RaftState { + Follower, + Candidate, + Leader, +} + +/// Информация о Raft узле +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RaftNode { + pub node_id: String, + pub address: String, + pub state: RaftState, + pub term: u64, + pub voted_for: Option, + pub last_heartbeat: i64, +} + +/// Информация о шард-узле с Raft +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ShardNode { + pub node_id: String, + pub address: String, + pub capacity: u64, + pub used: u64, + pub collections: Vec, + pub raft_info: RaftNode, +} + +/// Состояние шардинга для коллекции +#[derive(Debug, Clone)] +pub struct CollectionSharding { + pub shard_key: String, + pub virtual_nodes: usize, + pub ring: BTreeMap, // consistent hash ring +} + +/// События репликации +#[derive(Debug, Serialize, Deserialize)] +pub enum ReplicationEvent { + Command(protocol::Command), + SyncRequest, + Heartbeat, + RaftVoteRequest { term: u64, candidate_id: String }, + RaftVoteResponse { term: u64, vote_granted: bool }, + RaftAppendEntries { term: u64, leader_id: String }, +} + +/// Менеджер шардинга и репликации с Raft +#[derive(Clone)] +pub struct ShardingManager { + // Шардинг компоненты + nodes: Arc>, // Lock-free хранение узлов + collections: Arc>, // Lock-free хранение коллекций + virtual_nodes_per_node: usize, + + // Raft компоненты + current_term: Arc, // Текущий терм Raft + voted_for: Arc>, // Голоса за термы + is_leader: Arc, // Флаг лидера + cluster_formed: Arc, // Флаг сформированности кластера + + // Репликация компоненты + replication_tx: Arc>, + sequence_number: Arc, + replication_enabled: Arc, +} + +impl ShardingManager { + /// Создание нового менеджера шардинга и репликации + pub fn new(virtual_nodes_per_node: usize, replication_enabled: bool) -> Self { + let (tx, rx) = mpsc::channel(1000); // Убрал ненужный mut + + let manager = Self { + nodes: Arc::new(DashMap::new()), + collections: Arc::new(DashMap::new()), + virtual_nodes_per_node, + current_term: Arc::new(AtomicU64::new(0)), + voted_for: Arc::new(DashMap::new()), + is_leader: Arc::new(AtomicBool::new(false)), + cluster_formed: Arc::new(AtomicBool::new(false)), + replication_tx: Arc::new(tx), + sequence_number: Arc::new(AtomicU64::new(0)), + replication_enabled: Arc::new(AtomicBool::new(replication_enabled)), + }; + + // Запуск фоновой задачи обработки репликации и Raft + let manager_clone = manager.clone(); + tokio::spawn(async move { + manager_clone.run_replication_loop(rx).await; + }); + + manager + } + + /// Фоновая задача обработки репликации и Raft + async fn run_replication_loop(self, mut rx: mpsc::Receiver) { + let mut heartbeat_interval = interval(Duration::from_millis(1000)); + + loop { + tokio::select! { + Some(event) = rx.recv() => { + self.handle_replication_event(event).await; + } + _ = heartbeat_interval.tick() => { + // ИСПРАВЛЕНИЕ: Проверяем, что кластер сформирован перед отправкой heartbeat + if self.is_leader.load(Ordering::SeqCst) && + self.replication_enabled.load(Ordering::SeqCst) && + self.cluster_formed.load(Ordering::SeqCst) { + let _ = self.send_heartbeat().await; + } + } + } + } + } + + /// Обработка событий репликации + async fn handle_replication_event(&self, event: ReplicationEvent) { + if !self.replication_enabled.load(Ordering::SeqCst) { + return; + } + + match event { + ReplicationEvent::Command(cmd) => { + self.replicate_command(cmd).await; + } + ReplicationEvent::SyncRequest => { + self.sync_with_nodes().await; + } + ReplicationEvent::Heartbeat => { + let _ = self.send_heartbeat().await; + } + ReplicationEvent::RaftVoteRequest { term, candidate_id } => { + self.handle_vote_request(term, candidate_id).await; + } + ReplicationEvent::RaftVoteResponse { term, vote_granted } => { + self.handle_vote_response(term, vote_granted).await; + } + ReplicationEvent::RaftAppendEntries { term, leader_id } => { + self.handle_append_entries(term, leader_id).await; + } + } + } + + /// Репликация команды на другие узлы + async fn replicate_command(&self, command: protocol::Command) { + let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst); + + for entry in self.nodes.iter() { + let node = entry.value(); + // ИСПРАВЛЕНИЕ: Реплицируем на все узлы, кроме текущего лидера + if node.raft_info.state != RaftState::Leader { + let node_addr = node.address.clone(); + let cmd_clone = command.clone(); + let seq_clone = sequence; + + tokio::spawn(async move { + if let Err(e) = Self::send_command_to_node(&node_addr, &cmd_clone, seq_clone).await { + eprintln!("Failed to replicate to {}: {}", node_addr, e); + } + }); + } + } + } + + /// Отправка команды на удаленный узел + async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> { + let mut stream = match tokio::net::TcpStream::connect(node).await { + Ok(stream) => stream, + Err(e) => { + eprintln!("Failed to connect to {}: {}", node, e); + return Ok(()); + } + }; + + let message = protocol::ReplicationMessage { + sequence, + command: command.clone(), + timestamp: chrono::Utc::now().timestamp(), + }; + + let bytes = protocol::serialize(&message)?; + + if let Err(e) = stream.write_all(&bytes).await { + eprintln!("Failed to send command to {}: {}", node, e); + } + + Ok(()) + } + + /// Синхронизация с другими узлами + async fn sync_with_nodes(&self) { + println!("Starting sync with {} nodes", self.nodes.len()); + for entry in self.nodes.iter() { + let node_addr = entry.value().address.clone(); + tokio::spawn(async move { + if let Err(e) = Self::sync_with_node(&node_addr).await { + eprintln!("Failed to sync with {}: {}", node_addr, e); + } + }); + } + } + + /// Синхронизация с удаленным узлом + async fn sync_with_node(_node: &str) -> Result<()> { + // TODO: Реализовать wait-free синхронизацию данных + Ok(()) + } + + /// Отправка heartbeat + async fn send_heartbeat(&self) -> Result<()> { + for entry in self.nodes.iter() { + let node = entry.value(); + // ИСПРАВЛЕНИЕ: Отправляем heartbeat только на follower узлы + if node.raft_info.state == RaftState::Follower { + let node_addr = node.address.clone(); + tokio::spawn(async move { + if let Err(e) = Self::send_heartbeat_to_node(&node_addr).await { + eprintln!("Heartbeat failed for {}: {}", node_addr, e); + } + }); + } + } + Ok(()) + } + + /// Отправка heartbeat на удаленный узел + async fn send_heartbeat_to_node(node: &str) -> Result<()> { + let mut stream = match tokio::net::TcpStream::connect(node).await { + Ok(stream) => stream, + Err(e) => { + eprintln!("Failed to connect to {} for heartbeat: {}", node, e); + return Ok(()); + } + }; + + let heartbeat = protocol::ReplicationMessage { + sequence: 0, + command: protocol::Command::CallProcedure { name: "heartbeat".to_string() }, + timestamp: chrono::Utc::now().timestamp(), + }; + + let bytes = protocol::serialize(&heartbeat)?; + + if let Err(e) = stream.write_all(&bytes).await { + eprintln!("Failed to send heartbeat to {}: {}", node, e); + } + + Ok(()) + } + + // Raft методы + /// Обработка запроса голоса + async fn handle_vote_request(&self, term: u64, candidate_id: String) { + let current_term = self.current_term.load(Ordering::SeqCst); + + if term > current_term { + self.current_term.store(term, Ordering::SeqCst); + self.voted_for.insert(term, candidate_id.clone()); + // TODO: Отправить положительный ответ + } + // TODO: Отправить отрицательный ответ если условия не выполнены + } + + /// Обработка ответа голоса + async fn handle_vote_response(&self, term: u64, vote_granted: bool) { + if vote_granted && term == self.current_term.load(Ordering::SeqCst) { + // TODO: Подсчитать голоса и перейти в лидеры при большинстве + } + } + + /// Обработка AppendEntries RPC + async fn handle_append_entries(&self, term: u64, leader_id: String) { + let current_term = self.current_term.load(Ordering::SeqCst); + + if term >= current_term { + self.current_term.store(term, Ordering::SeqCst); + self.is_leader.store(false, Ordering::SeqCst); + // TODO: Обновить состояние follower + } + } + + // Шардинг методы + /// Добавление шард-узла с Raft информацией + pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> { + let raft_node = RaftNode { + node_id: node_id.clone(), + address: address.clone(), + state: RaftState::Follower, + term: 0, + voted_for: None, + last_heartbeat: chrono::Utc::now().timestamp(), + }; + + let node = ShardNode { + node_id: node_id.clone(), + address, + capacity, + used: 0, + collections: Vec::new(), + raft_info: raft_node, + }; + + self.nodes.insert(node_id, node); + + // ИЗМЕНЕНИЕ: Проверяем сформированность кластера (минимум 3 узла для шардинга) + if self.nodes.len() >= 3 { + self.cluster_formed.store(true, Ordering::SeqCst); + println!("Cluster formed with {} nodes", self.nodes.len()); + } + + Ok(()) + } + + /// Удаление шард-узла + pub fn remove_node(&self, node_id: &str) -> Result<()> { + self.nodes.remove(node_id); + + // ИЗМЕНЕНИЕ: Проверяем сформированность кластера после удаления (минимум 3 узла) + if self.nodes.len() < 3 { + self.cluster_formed.store(false, Ordering::SeqCst); + self.is_leader.store(false, Ordering::SeqCst); + } + + Ok(()) + } + + /// Настройка шардинга для коллекции + pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> { + // Проверка наличия кластера перед настройкой шардинга + if !self.cluster_formed.load(Ordering::SeqCst) { + return Err(crate::common::FutriixError::DatabaseError( + // ИЗМЕНЕНИЕ: Обновлено сообщение об ошибке + "Cannot setup sharding: cluster not formed. Need at least 3 nodes.".to_string() + )); + } + + let sharding = CollectionSharding { + shard_key: shard_key.to_string(), + virtual_nodes: self.virtual_nodes_per_node, + ring: BTreeMap::new(), + }; + + self.collections.insert(collection.to_string(), sharding); + self.rebuild_ring(collection)?; + Ok(()) + } + + /// Перестроение хэш-ринга для коллекции + fn rebuild_ring(&self, collection: &str) -> Result<()> { + if let Some(mut sharding) = self.collections.get_mut(collection) { + sharding.ring.clear(); + + for entry in self.nodes.iter() { + let node_id = entry.key(); + for i in 0..sharding.virtual_nodes { + let key = format!("{}-{}", node_id, i); + let hash = self.hash_key(&key); + sharding.ring.insert(hash, node_id.clone()); + } + } + } + + Ok(()) + } + + /// Хэширование ключа + fn hash_key(&self, key: &str) -> u64 { + let mut hasher = SipHasher13::new(); + key.hash(&mut hasher); + hasher.finish() + } + + /// Поиск узла для ключа + pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result> { + // Проверка наличия кластера перед поиском узла + if !self.cluster_formed.load(Ordering::SeqCst) { + return Err(crate::common::FutriixError::DatabaseError( + // ИЗМЕНЕНИЕ: Обновлено сообщение об ошибке + "Cannot find node: cluster not formed. Need at least 3 nodes.".to_string() + )); + } + + if let Some(sharding) = self.collections.get(collection) { + let key_hash = self.hash_key(key_value); + + // Поиск в хэш-ринге (консистентное хэширование) + let mut range = sharding.ring.range(key_hash..); + if let Some((_, node_id)) = range.next() { + return Ok(Some(node_id.clone())); + } + + // Если не найдено в верхней части ринга, берем первый узел + if let Some((_, node_id)) = sharding.ring.iter().next() { + return Ok(Some(node_id.clone())); + } + } + + Ok(None) + } + + /// Миграция шарда + pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> { + // Проверка наличия кластера перед миграцией + if !self.cluster_formed.load(Ordering::SeqCst) { + return Err(crate::common::FutriixError::DatabaseError( + // ИЗМЕНЕНИЕ: Обновлено сообщение об ошибке + "Cannot migrate shard: cluster not formed. Need at least 3 nodes.".to_string() + )); + } + + println!("Migrating shard for collection '{}' from {} to {} with key {}", + collection, from_node, to_node, shard_key); + + self.rebuild_ring(collection)?; + Ok(()) + } + + /// Ребалансировка кластера + pub fn rebalance_cluster(&self) -> Result<()> { + // Проверка наличия кластера перед ребалансировкой + if !self.cluster_formed.load(Ordering::SeqCst) { + return Err(crate::common::FutriixError::DatabaseError( + // ИЗМЕНЕНИЕ: Обновлено сообщение об ошибке + "Cannot rebalance cluster: cluster not formed. Need at least 3 nodes.".to_string() + )); + } + + println!("Rebalancing cluster with {} nodes", self.nodes.len()); + + // Перестраиваем все хэш-ринги + for mut entry in self.collections.iter_mut() { + let sharding = entry.value_mut(); + sharding.ring.clear(); + + for node_entry in self.nodes.iter() { + let node_id = node_entry.key(); + for i in 0..sharding.virtual_nodes { + let key = format!("{}-{}", node_id, i); + let hash = self.hash_key(&key); + sharding.ring.insert(hash, node_id.clone()); + } + } + } + + // Ребалансировка узлов кластера + self.rebalance_nodes()?; + + Ok(()) + } + + /// Ребалансировка узлов кластера + fn rebalance_nodes(&self) -> Result<()> { + println!("Rebalancing nodes in cluster..."); + + // Рассчитываем среднюю загрузку + let total_capacity: u64 = self.nodes.iter().map(|entry| entry.value().capacity).sum(); + let total_used: u64 = self.nodes.iter().map(|entry| entry.value().used).sum(); + let avg_usage = if total_capacity > 0 { total_used as f64 / total_capacity as f64 } else { 0.0 }; + + println!("Cluster usage: {:.2}% ({} / {})", avg_usage * 100.0, total_used, total_capacity); + + // TODO: Реализовать алгоритм ребалансировки узлов + // - Определить перегруженные и недогруженные узлы + // - Перераспределить данные между узлами + // - Обновить метаданные шардинга + + Ok(()) + } + + /// Получение статуса кластера + pub fn get_cluster_status(&self) -> Result { + let mut cluster_nodes = Vec::new(); + let mut total_capacity = 0; + let mut total_used = 0; + let mut raft_nodes = Vec::new(); + + for entry in self.nodes.iter() { + let node = entry.value(); + total_capacity += node.capacity; + total_used += node.used; + + cluster_nodes.push(protocol::ShardInfo { + node_id: node.node_id.clone(), + address: node.address.clone(), + capacity: node.capacity, + used: node.used, + collections: node.collections.clone(), + }); + + raft_nodes.push(protocol::RaftNodeInfo { + node_id: node.node_id.clone(), + address: node.address.clone(), + state: match node.raft_info.state { + RaftState::Leader => "leader".to_string(), + RaftState::Follower => "follower".to_string(), + RaftState::Candidate => "candidate".to_string(), + }, + term: node.raft_info.term, + last_heartbeat: node.raft_info.last_heartbeat, + }); + } + + Ok(protocol::ClusterStatus { + nodes: cluster_nodes, + total_capacity, + total_used, + rebalance_needed: false, + cluster_formed: self.cluster_formed.load(Ordering::SeqCst), + leader_exists: self.is_leader.load(Ordering::SeqCst), + raft_nodes, + }) + } + + /// Получение списка Raft узлов + pub fn get_raft_nodes(&self) -> Vec { + self.nodes.iter() + .map(|entry| entry.value().raft_info.clone()) + .collect() + } + + /// Проверка сформированности кластера + pub fn is_cluster_formed(&self) -> bool { + self.cluster_formed.load(Ordering::SeqCst) + } + + /// Raft выборы - начало кампании + pub fn start_election(&self) -> Result<()> { + if !self.cluster_formed.load(Ordering::SeqCst) { + return Err(crate::common::FutriixError::DatabaseError( + // ИЗМЕНЕНИЕ: Обновлено сообщение об ошибке + "Cluster not formed. Need at least 3 nodes.".to_string() + )); + } + + let new_term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1; + println!("Starting election for term {}", new_term); + + // Переход в состояние candidate + self.is_leader.store(false, Ordering::SeqCst); + + // TODO: Реализовать полную логику Raft выборов + // - Отправка RequestVote RPC на другие узлы + // - Сбор голосов + // - Переход в состояние Leader при получении большинства + + Ok(()) + } + + /// Отправка команды на репликацию + pub async fn replicate(&self, command: protocol::Command) -> Result<()> { + if !self.replication_enabled.load(Ordering::SeqCst) { + return Ok(()); + } + + self.replication_tx.send(ReplicationEvent::Command(command)).await + .map_err(|e| crate::common::FutriixError::ReplicationError(e.to_string())) + } + + /// Запрос синхронизации с другими узлами + pub async fn request_sync(&self) -> Result<()> { + if !self.replication_enabled.load(Ordering::SeqCst) { + return Ok(()); + } + + self.replication_tx.send(ReplicationEvent::SyncRequest).await + .map_err(|e| crate::common::FutriixError::ReplicationError(e.to_string())) + } + + /// Получение списка узлов репликации + pub fn get_nodes(&self) -> Vec { + self.nodes.iter() + .map(|entry| entry.value().clone()) + .collect() + } + + /// Получение текущего номера последовательности + pub fn get_sequence_number(&self) -> u64 { + self.sequence_number.load(Ordering::SeqCst) + } + + /// Проверка, включена ли репликация + pub fn is_replication_enabled(&self) -> bool { + self.replication_enabled.load(Ordering::SeqCst) + } + + /// Получение информации об узле + pub fn get_node(&self, node_id: &str) -> Option { + self.nodes.get(node_id).map(|entry| entry.value().clone()) + } +}