diff --git a/src/server/sharding.rs b/src/server/sharding.rs deleted file mode 100755 index a07e05e..0000000 --- a/src/server/sharding.rs +++ /dev/null @@ -1,602 +0,0 @@ -// src/server/sharding.rs -//! Модуль шардинга с консистентным хэшированием и Raft протоколом -//! -//! Объединяет функционал шардинга и репликации с wait-free архитектурой -//! и реализацией Raft консенсуса для работы в production. - -use std::collections::{HashMap, BTreeMap}; -use std::hash::{Hash, Hasher}; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use tokio::sync::mpsc; -use tokio::time::{interval, Duration}; -use tokio::io::AsyncWriteExt; -use serde::{Serialize, Deserialize}; -use siphasher::sip::SipHasher13; -use dashmap::DashMap; - -use crate::common::Result; -use crate::common::protocol; - -/// Состояния узла в Raft протоколе -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub enum RaftState { - Follower, - Candidate, - Leader, -} - -/// Информация о Raft узле -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RaftNode { - pub node_id: String, - pub address: String, - pub state: RaftState, - pub term: u64, - pub voted_for: Option, - pub last_heartbeat: i64, -} - -/// Информация о шард-узле с Raft -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ShardNode { - pub node_id: String, - pub address: String, - pub capacity: u64, - pub used: u64, - pub collections: Vec, - pub raft_info: RaftNode, -} - -/// Состояние шардинга для коллекции -#[derive(Debug, Clone)] -pub struct CollectionSharding { - pub shard_key: String, - pub virtual_nodes: usize, - pub ring: BTreeMap, // consistent hash ring -} - -/// События репликации -#[derive(Debug, Serialize, Deserialize)] -pub enum ReplicationEvent { - Command(protocol::Command), - SyncRequest, - Heartbeat, - RaftVoteRequest { term: u64, candidate_id: String }, - RaftVoteResponse { term: u64, vote_granted: bool }, - RaftAppendEntries { term: u64, leader_id: String }, -} - -/// Менеджер шардинга и репликации с Raft -#[derive(Clone)] -pub struct ShardingManager { - // Шардинг компоненты - nodes: Arc>, // Lock-free хранение узлов - collections: Arc>, // Lock-free хранение коллекций - virtual_nodes_per_node: usize, - - // Raft компоненты - current_term: Arc, // Текущий терм Raft - voted_for: Arc>, // Голоса за термы - is_leader: Arc, // Флаг лидера - cluster_formed: Arc, // Флаг сформированности кластера - - // Репликация компоненты - replication_tx: Arc>, - sequence_number: Arc, - replication_enabled: Arc, -} - -impl ShardingManager { - /// Создание нового менеджера шардинга и репликации - pub fn new(virtual_nodes_per_node: usize, replication_enabled: bool) -> Self { - let (tx, rx) = mpsc::channel(1000); // Убрал ненужный mut - - let manager = Self { - nodes: Arc::new(DashMap::new()), - collections: Arc::new(DashMap::new()), - virtual_nodes_per_node, - current_term: Arc::new(AtomicU64::new(0)), - voted_for: Arc::new(DashMap::new()), - is_leader: Arc::new(AtomicBool::new(false)), - cluster_formed: Arc::new(AtomicBool::new(false)), - replication_tx: Arc::new(tx), - sequence_number: Arc::new(AtomicU64::new(0)), - replication_enabled: Arc::new(AtomicBool::new(replication_enabled)), - }; - - // Запуск фоновой задачи обработки репликации и Raft - let manager_clone = manager.clone(); - tokio::spawn(async move { - manager_clone.run_replication_loop(rx).await; - }); - - manager - } - - /// Фоновая задача обработки репликации и Raft - async fn run_replication_loop(self, mut rx: mpsc::Receiver) { - let mut heartbeat_interval = interval(Duration::from_millis(1000)); - - loop { - tokio::select! { - Some(event) = rx.recv() => { - self.handle_replication_event(event).await; - } - _ = heartbeat_interval.tick() => { - if self.is_leader.load(Ordering::SeqCst) && self.replication_enabled.load(Ordering::SeqCst) { - let _ = self.send_heartbeat().await; - } - } - } - } - } - - /// Обработка событий репликации - async fn handle_replication_event(&self, event: ReplicationEvent) { - if !self.replication_enabled.load(Ordering::SeqCst) { - return; - } - - match event { - ReplicationEvent::Command(cmd) => { - self.replicate_command(cmd).await; - } - ReplicationEvent::SyncRequest => { - self.sync_with_nodes().await; - } - ReplicationEvent::Heartbeat => { - let _ = self.send_heartbeat().await; - } - ReplicationEvent::RaftVoteRequest { term, candidate_id } => { - self.handle_vote_request(term, candidate_id).await; - } - ReplicationEvent::RaftVoteResponse { term, vote_granted } => { - self.handle_vote_response(term, vote_granted).await; - } - ReplicationEvent::RaftAppendEntries { term, leader_id } => { - self.handle_append_entries(term, leader_id).await; - } - } - } - - /// Репликация команды на другие узлы - async fn replicate_command(&self, command: protocol::Command) { - let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst); - - for entry in self.nodes.iter() { - let node = entry.value(); - if node.raft_info.state != RaftState::Leader { - let node_addr = node.address.clone(); - let cmd_clone = command.clone(); - let seq_clone = sequence; - - tokio::spawn(async move { - if let Err(e) = Self::send_command_to_node(&node_addr, &cmd_clone, seq_clone).await { - eprintln!("Failed to replicate to {}: {}", node_addr, e); - } - }); - } - } - } - - /// Отправка команды на удаленный узел - async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> { - let mut stream = match tokio::net::TcpStream::connect(node).await { - Ok(stream) => stream, - Err(e) => { - eprintln!("Failed to connect to {}: {}", node, e); - return Ok(()); - } - }; - - let message = protocol::ReplicationMessage { - sequence, - command: command.clone(), - timestamp: chrono::Utc::now().timestamp(), - }; - - let bytes = protocol::serialize(&message)?; - - if let Err(e) = stream.write_all(&bytes).await { - eprintln!("Failed to send command to {}: {}", node, e); - } - - Ok(()) - } - - /// Синхронизация с другими узлами - async fn sync_with_nodes(&self) { - println!("Starting sync with {} nodes", self.nodes.len()); - for entry in self.nodes.iter() { - let node_addr = entry.value().address.clone(); - tokio::spawn(async move { - if let Err(e) = Self::sync_with_node(&node_addr).await { - eprintln!("Failed to sync with {}: {}", node_addr, e); - } - }); - } - } - - /// Синхронизация с удаленным узлом - async fn sync_with_node(_node: &str) -> Result<()> { - // TODO: Реализовать wait-free синхронизацию данных - Ok(()) - } - - /// Отправка heartbeat - async fn send_heartbeat(&self) -> Result<()> { - for entry in self.nodes.iter() { - let node = entry.value(); - if node.raft_info.state == RaftState::Follower { - let node_addr = node.address.clone(); - tokio::spawn(async move { - if let Err(e) = Self::send_heartbeat_to_node(&node_addr).await { - eprintln!("Heartbeat failed for {}: {}", node_addr, e); - } - }); - } - } - Ok(()) - } - - /// Отправка heartbeat на удаленный узел - async fn send_heartbeat_to_node(node: &str) -> Result<()> { - let mut stream = match tokio::net::TcpStream::connect(node).await { - Ok(stream) => stream, - Err(e) => { - eprintln!("Failed to connect to {} for heartbeat: {}", node, e); - return Ok(()); - } - }; - - let heartbeat = protocol::ReplicationMessage { - sequence: 0, - command: protocol::Command::CallProcedure { name: "heartbeat".to_string() }, - timestamp: chrono::Utc::now().timestamp(), - }; - - let bytes = protocol::serialize(&heartbeat)?; - - if let Err(e) = stream.write_all(&bytes).await { - eprintln!("Failed to send heartbeat to {}: {}", node, e); - } - - Ok(()) - } - - // Raft методы - /// Обработка запроса голоса - async fn handle_vote_request(&self, term: u64, candidate_id: String) { - let current_term = self.current_term.load(Ordering::SeqCst); - - if term > current_term { - self.current_term.store(term, Ordering::SeqCst); - self.voted_for.insert(term, candidate_id.clone()); - // TODO: Отправить положительный ответ - } - // TODO: Отправить отрицательный ответ если условия не выполнены - } - - /// Обработка ответа голоса - async fn handle_vote_response(&self, term: u64, vote_granted: bool) { - if vote_granted && term == self.current_term.load(Ordering::SeqCst) { - // TODO: Подсчитать голоса и перейти в лидеры при большинстве - } - } - - /// Обработка AppendEntries RPC - async fn handle_append_entries(&self, term: u64, leader_id: String) { - let current_term = self.current_term.load(Ordering::SeqCst); - - if term >= current_term { - self.current_term.store(term, Ordering::SeqCst); - self.is_leader.store(false, Ordering::SeqCst); - // TODO: Обновить состояние follower - } - } - - // Шардинг методы - /// Добавление шард-узла с Raft информацией - pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> { - let raft_node = RaftNode { - node_id: node_id.clone(), - address: address.clone(), - state: RaftState::Follower, - term: 0, - voted_for: None, - last_heartbeat: chrono::Utc::now().timestamp(), - }; - - let node = ShardNode { - node_id: node_id.clone(), - address, - capacity, - used: 0, - collections: Vec::new(), - raft_info: raft_node, - }; - - self.nodes.insert(node_id, node); - - // Проверяем сформированность кластера (минимум 2 узла для шардинга) - if self.nodes.len() >= 2 { - self.cluster_formed.store(true, Ordering::SeqCst); - println!("Cluster formed with {} nodes", self.nodes.len()); - } - - Ok(()) - } - - /// Удаление шард-узла - pub fn remove_node(&self, node_id: &str) -> Result<()> { - self.nodes.remove(node_id); - - // Проверяем сформированность кластера после удаления - if self.nodes.len() < 2 { - self.cluster_formed.store(false, Ordering::SeqCst); - self.is_leader.store(false, Ordering::SeqCst); - } - - Ok(()) - } - - /// Настройка шардинга для коллекции - pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> { - // Проверка наличия кластера перед настройкой шардинга - if !self.cluster_formed.load(Ordering::SeqCst) { - return Err(crate::common::FutriixError::DatabaseError( - "Cannot setup sharding: cluster not formed. Need at least 2 nodes.".to_string() - )); - } - - let sharding = CollectionSharding { - shard_key: shard_key.to_string(), - virtual_nodes: self.virtual_nodes_per_node, - ring: BTreeMap::new(), - }; - - self.collections.insert(collection.to_string(), sharding); - self.rebuild_ring(collection)?; - Ok(()) - } - - /// Перестроение хэш-ринга для коллекции - fn rebuild_ring(&self, collection: &str) -> Result<()> { - if let Some(mut sharding) = self.collections.get_mut(collection) { - sharding.ring.clear(); - - for entry in self.nodes.iter() { - let node_id = entry.key(); - for i in 0..sharding.virtual_nodes { - let key = format!("{}-{}", node_id, i); - let hash = self.hash_key(&key); - sharding.ring.insert(hash, node_id.clone()); - } - } - } - - Ok(()) - } - - /// Хэширование ключа - fn hash_key(&self, key: &str) -> u64 { - let mut hasher = SipHasher13::new(); - key.hash(&mut hasher); - hasher.finish() - } - - /// Поиск узла для ключа - pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result> { - // Проверка наличия кластера перед поиском узла - if !self.cluster_formed.load(Ordering::SeqCst) { - return Err(crate::common::FutriixError::DatabaseError( - "Cannot find node: cluster not formed. Need at least 2 nodes.".to_string() - )); - } - - if let Some(sharding) = self.collections.get(collection) { - let key_hash = self.hash_key(key_value); - - // Поиск в хэш-ринге (консистентное хэширование) - let mut range = sharding.ring.range(key_hash..); - if let Some((_, node_id)) = range.next() { - return Ok(Some(node_id.clone())); - } - - // Если не найдено в верхней части ринга, берем первый узел - if let Some((_, node_id)) = sharding.ring.iter().next() { - return Ok(Some(node_id.clone())); - } - } - - Ok(None) - } - - /// Миграция шарда - pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> { - // Проверка наличия кластера перед миграцией - if !self.cluster_formed.load(Ordering::SeqCst) { - return Err(crate::common::FutriixError::DatabaseError( - "Cannot migrate shard: cluster not formed. Need at least 2 nodes.".to_string() - )); - } - - println!("Migrating shard for collection '{}' from {} to {} with key {}", - collection, from_node, to_node, shard_key); - - self.rebuild_ring(collection)?; - Ok(()) - } - - /// Ребалансировка кластера - pub fn rebalance_cluster(&self) -> Result<()> { - // Проверка наличия кластера перед ребалансировкой - if !self.cluster_formed.load(Ordering::SeqCst) { - return Err(crate::common::FutriixError::DatabaseError( - "Cannot rebalance cluster: cluster not formed. Need at least 2 nodes.".to_string() - )); - } - - println!("Rebalancing cluster with {} nodes", self.nodes.len()); - - // Перестраиваем все хэш-ринги - for mut entry in self.collections.iter_mut() { - let sharding = entry.value_mut(); - sharding.ring.clear(); - - for node_entry in self.nodes.iter() { - let node_id = node_entry.key(); - for i in 0..sharding.virtual_nodes { - let key = format!("{}-{}", node_id, i); - let hash = self.hash_key(&key); - sharding.ring.insert(hash, node_id.clone()); - } - } - } - - // Ребалансировка узлов кластера - self.rebalance_nodes()?; - - Ok(()) - } - - /// Ребалансировка узлов кластера - fn rebalance_nodes(&self) -> Result<()> { - println!("Rebalancing nodes in cluster..."); - - // Рассчитываем среднюю загрузку - let total_capacity: u64 = self.nodes.iter().map(|entry| entry.value().capacity).sum(); - let total_used: u64 = self.nodes.iter().map(|entry| entry.value().used).sum(); - let avg_usage = if total_capacity > 0 { total_used as f64 / total_capacity as f64 } else { 0.0 }; - - println!("Cluster usage: {:.2}% ({} / {})", avg_usage * 100.0, total_used, total_capacity); - - // TODO: Реализовать алгоритм ребалансировки узлов - // - Определить перегруженные и недогруженные узлы - // - Перераспределить данные между узлами - // - Обновить метаданные шардинга - - Ok(()) - } - - /// Получение статуса кластера - pub fn get_cluster_status(&self) -> Result { - let mut cluster_nodes = Vec::new(); - let mut total_capacity = 0; - let mut total_used = 0; - let mut raft_nodes = Vec::new(); - - for entry in self.nodes.iter() { - let node = entry.value(); - total_capacity += node.capacity; - total_used += node.used; - - cluster_nodes.push(protocol::ShardInfo { - node_id: node.node_id.clone(), - address: node.address.clone(), - capacity: node.capacity, - used: node.used, - collections: node.collections.clone(), - }); - - raft_nodes.push(protocol::RaftNodeInfo { - node_id: node.node_id.clone(), - address: node.address.clone(), - state: match node.raft_info.state { - RaftState::Leader => "leader".to_string(), - RaftState::Follower => "follower".to_string(), - RaftState::Candidate => "candidate".to_string(), - }, - term: node.raft_info.term, - last_heartbeat: node.raft_info.last_heartbeat, - }); - } - - Ok(protocol::ClusterStatus { - nodes: cluster_nodes, - total_capacity, - total_used, - rebalance_needed: false, - cluster_formed: self.cluster_formed.load(Ordering::SeqCst), - leader_exists: self.is_leader.load(Ordering::SeqCst), - raft_nodes, - }) - } - - /// Получение списка Raft узлов - pub fn get_raft_nodes(&self) -> Vec { - self.nodes.iter() - .map(|entry| entry.value().raft_info.clone()) - .collect() - } - - /// Проверка сформированности кластера - pub fn is_cluster_formed(&self) -> bool { - self.cluster_formed.load(Ordering::SeqCst) - } - - /// Raft выборы - начало кампании - pub fn start_election(&self) -> Result<()> { - if !self.cluster_formed.load(Ordering::SeqCst) { - return Err(crate::common::FutriixError::DatabaseError( - "Cluster not formed. Need at least 2 nodes.".to_string() - )); - } - - let new_term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1; - println!("Starting election for term {}", new_term); - - // Переход в состояние candidate - self.is_leader.store(false, Ordering::SeqCst); - - // TODO: Реализовать полную логику Raft выборов - // - Отправка RequestVote RPC на другие узлы - // - Сбор голосов - // - Переход в состояние Leader при получении большинства - - Ok(()) - } - - /// Отправка команды на репликацию - pub async fn replicate(&self, command: protocol::Command) -> Result<()> { - if !self.replication_enabled.load(Ordering::SeqCst) { - return Ok(()); - } - - self.replication_tx.send(ReplicationEvent::Command(command)).await - .map_err(|e| crate::common::FutriixError::ReplicationError(e.to_string())) - } - - /// Запрос синхронизации с другими узлами - pub async fn request_sync(&self) -> Result<()> { - if !self.replication_enabled.load(Ordering::SeqCst) { - return Ok(()); - } - - self.replication_tx.send(ReplicationEvent::SyncRequest).await - .map_err(|e| crate::common::FutriixError::ReplicationError(e.to_string())) - } - - /// Получение списка узлов репликации - pub fn get_nodes(&self) -> Vec { - self.nodes.iter() - .map(|entry| entry.value().clone()) - .collect() - } - - /// Получение текущего номера последовательности - pub fn get_sequence_number(&self) -> u64 { - self.sequence_number.load(Ordering::SeqCst) - } - - /// Проверка, включена ли репликация - pub fn is_replication_enabled(&self) -> bool { - self.replication_enabled.load(Ordering::SeqCst) - } - - /// Получение информации об узле - pub fn get_node(&self, node_id: &str) -> Option { - self.nodes.get(node_id).map(|entry| entry.value().clone()) - } -}