futriix/src/server/sharding.rs

603 lines
23 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// src/server/sharding.rs
//! Модуль шардинга с консистентным хэшированием и Raft протоколом
//!
//! Объединяет функционал шардинга и репликации с wait-free архитектурой
//! и реализацией Raft консенсуса для работы в production.
use std::collections::{HashMap, BTreeMap};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use tokio::sync::mpsc;
use tokio::time::{interval, Duration};
use tokio::io::AsyncWriteExt;
use serde::{Serialize, Deserialize};
use siphasher::sip::SipHasher13;
use dashmap::DashMap;
use crate::common::Result;
use crate::common::protocol;
/// Состояния узла в Raft протоколе
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum RaftState {
Follower,
Candidate,
Leader,
}
/// Информация о Raft узле
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RaftNode {
pub node_id: String,
pub address: String,
pub state: RaftState,
pub term: u64,
pub voted_for: Option<String>,
pub last_heartbeat: i64,
}
/// Информация о шард-узле с Raft
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShardNode {
pub node_id: String,
pub address: String,
pub capacity: u64,
pub used: u64,
pub collections: Vec<String>,
pub raft_info: RaftNode,
}
/// Состояние шардинга для коллекции
#[derive(Debug, Clone)]
pub struct CollectionSharding {
pub shard_key: String,
pub virtual_nodes: usize,
pub ring: BTreeMap<u64, String>, // consistent hash ring
}
/// События репликации
#[derive(Debug, Serialize, Deserialize)]
pub enum ReplicationEvent {
Command(protocol::Command),
SyncRequest,
Heartbeat,
RaftVoteRequest { term: u64, candidate_id: String },
RaftVoteResponse { term: u64, vote_granted: bool },
RaftAppendEntries { term: u64, leader_id: String },
}
/// Менеджер шардинга и репликации с Raft
#[derive(Clone)]
pub struct ShardingManager {
// Шардинг компоненты
nodes: Arc<DashMap<String, ShardNode>>, // Lock-free хранение узлов
collections: Arc<DashMap<String, CollectionSharding>>, // Lock-free хранение коллекций
virtual_nodes_per_node: usize,
// Raft компоненты
current_term: Arc<AtomicU64>, // Текущий терм Raft
voted_for: Arc<DashMap<u64, String>>, // Голоса за термы
is_leader: Arc<AtomicBool>, // Флаг лидера
cluster_formed: Arc<AtomicBool>, // Флаг сформированности кластера
// Репликация компоненты
replication_tx: Arc<mpsc::Sender<ReplicationEvent>>,
sequence_number: Arc<AtomicU64>,
replication_enabled: Arc<AtomicBool>,
}
impl ShardingManager {
/// Создание нового менеджера шардинга и репликации
pub fn new(virtual_nodes_per_node: usize, replication_enabled: bool) -> Self {
let (tx, rx) = mpsc::channel(1000); // Убрал ненужный mut
let manager = Self {
nodes: Arc::new(DashMap::new()),
collections: Arc::new(DashMap::new()),
virtual_nodes_per_node,
current_term: Arc::new(AtomicU64::new(0)),
voted_for: Arc::new(DashMap::new()),
is_leader: Arc::new(AtomicBool::new(false)),
cluster_formed: Arc::new(AtomicBool::new(false)),
replication_tx: Arc::new(tx),
sequence_number: Arc::new(AtomicU64::new(0)),
replication_enabled: Arc::new(AtomicBool::new(replication_enabled)),
};
// Запуск фоновой задачи обработки репликации и Raft
let manager_clone = manager.clone();
tokio::spawn(async move {
manager_clone.run_replication_loop(rx).await;
});
manager
}
/// Фоновая задача обработки репликации и Raft
async fn run_replication_loop(self, mut rx: mpsc::Receiver<ReplicationEvent>) {
let mut heartbeat_interval = interval(Duration::from_millis(1000));
loop {
tokio::select! {
Some(event) = rx.recv() => {
self.handle_replication_event(event).await;
}
_ = heartbeat_interval.tick() => {
if self.is_leader.load(Ordering::SeqCst) && self.replication_enabled.load(Ordering::SeqCst) {
let _ = self.send_heartbeat().await;
}
}
}
}
}
/// Обработка событий репликации
async fn handle_replication_event(&self, event: ReplicationEvent) {
if !self.replication_enabled.load(Ordering::SeqCst) {
return;
}
match event {
ReplicationEvent::Command(cmd) => {
self.replicate_command(cmd).await;
}
ReplicationEvent::SyncRequest => {
self.sync_with_nodes().await;
}
ReplicationEvent::Heartbeat => {
let _ = self.send_heartbeat().await;
}
ReplicationEvent::RaftVoteRequest { term, candidate_id } => {
self.handle_vote_request(term, candidate_id).await;
}
ReplicationEvent::RaftVoteResponse { term, vote_granted } => {
self.handle_vote_response(term, vote_granted).await;
}
ReplicationEvent::RaftAppendEntries { term, leader_id } => {
self.handle_append_entries(term, leader_id).await;
}
}
}
/// Репликация команды на другие узлы
async fn replicate_command(&self, command: protocol::Command) {
let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst);
for entry in self.nodes.iter() {
let node = entry.value();
if node.raft_info.state != RaftState::Leader {
let node_addr = node.address.clone();
let cmd_clone = command.clone();
let seq_clone = sequence;
tokio::spawn(async move {
if let Err(e) = Self::send_command_to_node(&node_addr, &cmd_clone, seq_clone).await {
eprintln!("Failed to replicate to {}: {}", node_addr, e);
}
});
}
}
}
/// Отправка команды на удаленный узел
async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> {
let mut stream = match tokio::net::TcpStream::connect(node).await {
Ok(stream) => stream,
Err(e) => {
eprintln!("Failed to connect to {}: {}", node, e);
return Ok(());
}
};
let message = protocol::ReplicationMessage {
sequence,
command: command.clone(),
timestamp: chrono::Utc::now().timestamp(),
};
let bytes = protocol::serialize(&message)?;
if let Err(e) = stream.write_all(&bytes).await {
eprintln!("Failed to send command to {}: {}", node, e);
}
Ok(())
}
/// Синхронизация с другими узлами
async fn sync_with_nodes(&self) {
println!("Starting sync with {} nodes", self.nodes.len());
for entry in self.nodes.iter() {
let node_addr = entry.value().address.clone();
tokio::spawn(async move {
if let Err(e) = Self::sync_with_node(&node_addr).await {
eprintln!("Failed to sync with {}: {}", node_addr, e);
}
});
}
}
/// Синхронизация с удаленным узлом
async fn sync_with_node(_node: &str) -> Result<()> {
// TODO: Реализовать wait-free синхронизацию данных
Ok(())
}
/// Отправка heartbeat
async fn send_heartbeat(&self) -> Result<()> {
for entry in self.nodes.iter() {
let node = entry.value();
if node.raft_info.state == RaftState::Follower {
let node_addr = node.address.clone();
tokio::spawn(async move {
if let Err(e) = Self::send_heartbeat_to_node(&node_addr).await {
eprintln!("Heartbeat failed for {}: {}", node_addr, e);
}
});
}
}
Ok(())
}
/// Отправка heartbeat на удаленный узел
async fn send_heartbeat_to_node(node: &str) -> Result<()> {
let mut stream = match tokio::net::TcpStream::connect(node).await {
Ok(stream) => stream,
Err(e) => {
eprintln!("Failed to connect to {} for heartbeat: {}", node, e);
return Ok(());
}
};
let heartbeat = protocol::ReplicationMessage {
sequence: 0,
command: protocol::Command::CallProcedure { name: "heartbeat".to_string() },
timestamp: chrono::Utc::now().timestamp(),
};
let bytes = protocol::serialize(&heartbeat)?;
if let Err(e) = stream.write_all(&bytes).await {
eprintln!("Failed to send heartbeat to {}: {}", node, e);
}
Ok(())
}
// Raft методы
/// Обработка запроса голоса
async fn handle_vote_request(&self, term: u64, candidate_id: String) {
let current_term = self.current_term.load(Ordering::SeqCst);
if term > current_term {
self.current_term.store(term, Ordering::SeqCst);
self.voted_for.insert(term, candidate_id.clone());
// TODO: Отправить положительный ответ
}
// TODO: Отправить отрицательный ответ если условия не выполнены
}
/// Обработка ответа голоса
async fn handle_vote_response(&self, term: u64, vote_granted: bool) {
if vote_granted && term == self.current_term.load(Ordering::SeqCst) {
// TODO: Подсчитать голоса и перейти в лидеры при большинстве
}
}
/// Обработка AppendEntries RPC
async fn handle_append_entries(&self, term: u64, leader_id: String) {
let current_term = self.current_term.load(Ordering::SeqCst);
if term >= current_term {
self.current_term.store(term, Ordering::SeqCst);
self.is_leader.store(false, Ordering::SeqCst);
// TODO: Обновить состояние follower
}
}
// Шардинг методы
/// Добавление шард-узла с Raft информацией
pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> {
let raft_node = RaftNode {
node_id: node_id.clone(),
address: address.clone(),
state: RaftState::Follower,
term: 0,
voted_for: None,
last_heartbeat: chrono::Utc::now().timestamp(),
};
let node = ShardNode {
node_id: node_id.clone(),
address,
capacity,
used: 0,
collections: Vec::new(),
raft_info: raft_node,
};
self.nodes.insert(node_id, node);
// Проверяем сформированность кластера (минимум 2 узла для шардинга)
if self.nodes.len() >= 2 {
self.cluster_formed.store(true, Ordering::SeqCst);
println!("Cluster formed with {} nodes", self.nodes.len());
}
Ok(())
}
/// Удаление шард-узла
pub fn remove_node(&self, node_id: &str) -> Result<()> {
self.nodes.remove(node_id);
// Проверяем сформированность кластера после удаления
if self.nodes.len() < 2 {
self.cluster_formed.store(false, Ordering::SeqCst);
self.is_leader.store(false, Ordering::SeqCst);
}
Ok(())
}
/// Настройка шардинга для коллекции
pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> {
// Проверка наличия кластера перед настройкой шардинга
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::DatabaseError(
"Cannot setup sharding: cluster not formed. Need at least 2 nodes.".to_string()
));
}
let sharding = CollectionSharding {
shard_key: shard_key.to_string(),
virtual_nodes: self.virtual_nodes_per_node,
ring: BTreeMap::new(),
};
self.collections.insert(collection.to_string(), sharding);
self.rebuild_ring(collection)?;
Ok(())
}
/// Перестроение хэш-ринга для коллекции
fn rebuild_ring(&self, collection: &str) -> Result<()> {
if let Some(mut sharding) = self.collections.get_mut(collection) {
sharding.ring.clear();
for entry in self.nodes.iter() {
let node_id = entry.key();
for i in 0..sharding.virtual_nodes {
let key = format!("{}-{}", node_id, i);
let hash = self.hash_key(&key);
sharding.ring.insert(hash, node_id.clone());
}
}
}
Ok(())
}
/// Хэширование ключа
fn hash_key(&self, key: &str) -> u64 {
let mut hasher = SipHasher13::new();
key.hash(&mut hasher);
hasher.finish()
}
/// Поиск узла для ключа
pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result<Option<String>> {
// Проверка наличия кластера перед поиском узла
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::DatabaseError(
"Cannot find node: cluster not formed. Need at least 2 nodes.".to_string()
));
}
if let Some(sharding) = self.collections.get(collection) {
let key_hash = self.hash_key(key_value);
// Поиск в хэш-ринге (консистентное хэширование)
let mut range = sharding.ring.range(key_hash..);
if let Some((_, node_id)) = range.next() {
return Ok(Some(node_id.clone()));
}
// Если не найдено в верхней части ринга, берем первый узел
if let Some((_, node_id)) = sharding.ring.iter().next() {
return Ok(Some(node_id.clone()));
}
}
Ok(None)
}
/// Миграция шарда
pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> {
// Проверка наличия кластера перед миграцией
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::DatabaseError(
"Cannot migrate shard: cluster not formed. Need at least 2 nodes.".to_string()
));
}
println!("Migrating shard for collection '{}' from {} to {} with key {}",
collection, from_node, to_node, shard_key);
self.rebuild_ring(collection)?;
Ok(())
}
/// Ребалансировка кластера
pub fn rebalance_cluster(&self) -> Result<()> {
// Проверка наличия кластера перед ребалансировкой
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::DatabaseError(
"Cannot rebalance cluster: cluster not formed. Need at least 2 nodes.".to_string()
));
}
println!("Rebalancing cluster with {} nodes", self.nodes.len());
// Перестраиваем все хэш-ринги
for mut entry in self.collections.iter_mut() {
let sharding = entry.value_mut();
sharding.ring.clear();
for node_entry in self.nodes.iter() {
let node_id = node_entry.key();
for i in 0..sharding.virtual_nodes {
let key = format!("{}-{}", node_id, i);
let hash = self.hash_key(&key);
sharding.ring.insert(hash, node_id.clone());
}
}
}
// Ребалансировка узлов кластера
self.rebalance_nodes()?;
Ok(())
}
/// Ребалансировка узлов кластера
fn rebalance_nodes(&self) -> Result<()> {
println!("Rebalancing nodes in cluster...");
// Рассчитываем среднюю загрузку
let total_capacity: u64 = self.nodes.iter().map(|entry| entry.value().capacity).sum();
let total_used: u64 = self.nodes.iter().map(|entry| entry.value().used).sum();
let avg_usage = if total_capacity > 0 { total_used as f64 / total_capacity as f64 } else { 0.0 };
println!("Cluster usage: {:.2}% ({} / {})", avg_usage * 100.0, total_used, total_capacity);
// TODO: Реализовать алгоритм ребалансировки узлов
// - Определить перегруженные и недогруженные узлы
// - Перераспределить данные между узлами
// - Обновить метаданные шардинга
Ok(())
}
/// Получение статуса кластера
pub fn get_cluster_status(&self) -> Result<protocol::ClusterStatus> {
let mut cluster_nodes = Vec::new();
let mut total_capacity = 0;
let mut total_used = 0;
let mut raft_nodes = Vec::new();
for entry in self.nodes.iter() {
let node = entry.value();
total_capacity += node.capacity;
total_used += node.used;
cluster_nodes.push(protocol::ShardInfo {
node_id: node.node_id.clone(),
address: node.address.clone(),
capacity: node.capacity,
used: node.used,
collections: node.collections.clone(),
});
raft_nodes.push(protocol::RaftNodeInfo {
node_id: node.node_id.clone(),
address: node.address.clone(),
state: match node.raft_info.state {
RaftState::Leader => "leader".to_string(),
RaftState::Follower => "follower".to_string(),
RaftState::Candidate => "candidate".to_string(),
},
term: node.raft_info.term,
last_heartbeat: node.raft_info.last_heartbeat,
});
}
Ok(protocol::ClusterStatus {
nodes: cluster_nodes,
total_capacity,
total_used,
rebalance_needed: false,
cluster_formed: self.cluster_formed.load(Ordering::SeqCst),
leader_exists: self.is_leader.load(Ordering::SeqCst),
raft_nodes,
})
}
/// Получение списка Raft узлов
pub fn get_raft_nodes(&self) -> Vec<RaftNode> {
self.nodes.iter()
.map(|entry| entry.value().raft_info.clone())
.collect()
}
/// Проверка сформированности кластера
pub fn is_cluster_formed(&self) -> bool {
self.cluster_formed.load(Ordering::SeqCst)
}
/// Raft выборы - начало кампании
pub fn start_election(&self) -> Result<()> {
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::DatabaseError(
"Cluster not formed. Need at least 2 nodes.".to_string()
));
}
let new_term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1;
println!("Starting election for term {}", new_term);
// Переход в состояние candidate
self.is_leader.store(false, Ordering::SeqCst);
// TODO: Реализовать полную логику Raft выборов
// - Отправка RequestVote RPC на другие узлы
// - Сбор голосов
// - Переход в состояние Leader при получении большинства
Ok(())
}
/// Отправка команды на репликацию
pub async fn replicate(&self, command: protocol::Command) -> Result<()> {
if !self.replication_enabled.load(Ordering::SeqCst) {
return Ok(());
}
self.replication_tx.send(ReplicationEvent::Command(command)).await
.map_err(|e| crate::common::FutriixError::ReplicationError(e.to_string()))
}
/// Запрос синхронизации с другими узлами
pub async fn request_sync(&self) -> Result<()> {
if !self.replication_enabled.load(Ordering::SeqCst) {
return Ok(());
}
self.replication_tx.send(ReplicationEvent::SyncRequest).await
.map_err(|e| crate::common::FutriixError::ReplicationError(e.to_string()))
}
/// Получение списка узлов репликации
pub fn get_nodes(&self) -> Vec<ShardNode> {
self.nodes.iter()
.map(|entry| entry.value().clone())
.collect()
}
/// Получение текущего номера последовательности
pub fn get_sequence_number(&self) -> u64 {
self.sequence_number.load(Ordering::SeqCst)
}
/// Проверка, включена ли репликация
pub fn is_replication_enabled(&self) -> bool {
self.replication_enabled.load(Ordering::SeqCst)
}
/// Получение информации об узле
pub fn get_node(&self, node_id: &str) -> Option<ShardNode> {
self.nodes.get(node_id).map(|entry| entry.value().clone())
}
}