Delete src/server/sharding.rs
This commit is contained in:
parent
a38e05def5
commit
cb4610fcd2
@ -1,602 +0,0 @@
|
|||||||
// src/server/sharding.rs
|
|
||||||
//! Модуль шардинга с консистентным хэшированием и Raft протоколом
|
|
||||||
//!
|
|
||||||
//! Объединяет функционал шардинга и репликации с wait-free архитектурой
|
|
||||||
//! и реализацией Raft консенсуса для работы в production.
|
|
||||||
|
|
||||||
use std::collections::{HashMap, BTreeMap};
|
|
||||||
use std::hash::{Hash, Hasher};
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
use tokio::time::{interval, Duration};
|
|
||||||
use tokio::io::AsyncWriteExt;
|
|
||||||
use serde::{Serialize, Deserialize};
|
|
||||||
use siphasher::sip::SipHasher13;
|
|
||||||
use dashmap::DashMap;
|
|
||||||
|
|
||||||
use crate::common::Result;
|
|
||||||
use crate::common::protocol;
|
|
||||||
|
|
||||||
/// Состояния узла в Raft протоколе
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
||||||
pub enum RaftState {
|
|
||||||
Follower,
|
|
||||||
Candidate,
|
|
||||||
Leader,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Информация о Raft узле
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct RaftNode {
|
|
||||||
pub node_id: String,
|
|
||||||
pub address: String,
|
|
||||||
pub state: RaftState,
|
|
||||||
pub term: u64,
|
|
||||||
pub voted_for: Option<String>,
|
|
||||||
pub last_heartbeat: i64,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Информация о шард-узле с Raft
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct ShardNode {
|
|
||||||
pub node_id: String,
|
|
||||||
pub address: String,
|
|
||||||
pub capacity: u64,
|
|
||||||
pub used: u64,
|
|
||||||
pub collections: Vec<String>,
|
|
||||||
pub raft_info: RaftNode,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Состояние шардинга для коллекции
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct CollectionSharding {
|
|
||||||
pub shard_key: String,
|
|
||||||
pub virtual_nodes: usize,
|
|
||||||
pub ring: BTreeMap<u64, String>, // consistent hash ring
|
|
||||||
}
|
|
||||||
|
|
||||||
/// События репликации
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
pub enum ReplicationEvent {
|
|
||||||
Command(protocol::Command),
|
|
||||||
SyncRequest,
|
|
||||||
Heartbeat,
|
|
||||||
RaftVoteRequest { term: u64, candidate_id: String },
|
|
||||||
RaftVoteResponse { term: u64, vote_granted: bool },
|
|
||||||
RaftAppendEntries { term: u64, leader_id: String },
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Менеджер шардинга и репликации с Raft
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct ShardingManager {
|
|
||||||
// Шардинг компоненты
|
|
||||||
nodes: Arc<DashMap<String, ShardNode>>, // Lock-free хранение узлов
|
|
||||||
collections: Arc<DashMap<String, CollectionSharding>>, // Lock-free хранение коллекций
|
|
||||||
virtual_nodes_per_node: usize,
|
|
||||||
|
|
||||||
// Raft компоненты
|
|
||||||
current_term: Arc<AtomicU64>, // Текущий терм Raft
|
|
||||||
voted_for: Arc<DashMap<u64, String>>, // Голоса за термы
|
|
||||||
is_leader: Arc<AtomicBool>, // Флаг лидера
|
|
||||||
cluster_formed: Arc<AtomicBool>, // Флаг сформированности кластера
|
|
||||||
|
|
||||||
// Репликация компоненты
|
|
||||||
replication_tx: Arc<mpsc::Sender<ReplicationEvent>>,
|
|
||||||
sequence_number: Arc<AtomicU64>,
|
|
||||||
replication_enabled: Arc<AtomicBool>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ShardingManager {
|
|
||||||
/// Создание нового менеджера шардинга и репликации
|
|
||||||
pub fn new(virtual_nodes_per_node: usize, replication_enabled: bool) -> Self {
|
|
||||||
let (tx, rx) = mpsc::channel(1000); // Убрал ненужный mut
|
|
||||||
|
|
||||||
let manager = Self {
|
|
||||||
nodes: Arc::new(DashMap::new()),
|
|
||||||
collections: Arc::new(DashMap::new()),
|
|
||||||
virtual_nodes_per_node,
|
|
||||||
current_term: Arc::new(AtomicU64::new(0)),
|
|
||||||
voted_for: Arc::new(DashMap::new()),
|
|
||||||
is_leader: Arc::new(AtomicBool::new(false)),
|
|
||||||
cluster_formed: Arc::new(AtomicBool::new(false)),
|
|
||||||
replication_tx: Arc::new(tx),
|
|
||||||
sequence_number: Arc::new(AtomicU64::new(0)),
|
|
||||||
replication_enabled: Arc::new(AtomicBool::new(replication_enabled)),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Запуск фоновой задачи обработки репликации и Raft
|
|
||||||
let manager_clone = manager.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
manager_clone.run_replication_loop(rx).await;
|
|
||||||
});
|
|
||||||
|
|
||||||
manager
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Фоновая задача обработки репликации и Raft
|
|
||||||
async fn run_replication_loop(self, mut rx: mpsc::Receiver<ReplicationEvent>) {
|
|
||||||
let mut heartbeat_interval = interval(Duration::from_millis(1000));
|
|
||||||
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
Some(event) = rx.recv() => {
|
|
||||||
self.handle_replication_event(event).await;
|
|
||||||
}
|
|
||||||
_ = heartbeat_interval.tick() => {
|
|
||||||
if self.is_leader.load(Ordering::SeqCst) && self.replication_enabled.load(Ordering::SeqCst) {
|
|
||||||
let _ = self.send_heartbeat().await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Обработка событий репликации
|
|
||||||
async fn handle_replication_event(&self, event: ReplicationEvent) {
|
|
||||||
if !self.replication_enabled.load(Ordering::SeqCst) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
match event {
|
|
||||||
ReplicationEvent::Command(cmd) => {
|
|
||||||
self.replicate_command(cmd).await;
|
|
||||||
}
|
|
||||||
ReplicationEvent::SyncRequest => {
|
|
||||||
self.sync_with_nodes().await;
|
|
||||||
}
|
|
||||||
ReplicationEvent::Heartbeat => {
|
|
||||||
let _ = self.send_heartbeat().await;
|
|
||||||
}
|
|
||||||
ReplicationEvent::RaftVoteRequest { term, candidate_id } => {
|
|
||||||
self.handle_vote_request(term, candidate_id).await;
|
|
||||||
}
|
|
||||||
ReplicationEvent::RaftVoteResponse { term, vote_granted } => {
|
|
||||||
self.handle_vote_response(term, vote_granted).await;
|
|
||||||
}
|
|
||||||
ReplicationEvent::RaftAppendEntries { term, leader_id } => {
|
|
||||||
self.handle_append_entries(term, leader_id).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Репликация команды на другие узлы
|
|
||||||
async fn replicate_command(&self, command: protocol::Command) {
|
|
||||||
let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst);
|
|
||||||
|
|
||||||
for entry in self.nodes.iter() {
|
|
||||||
let node = entry.value();
|
|
||||||
if node.raft_info.state != RaftState::Leader {
|
|
||||||
let node_addr = node.address.clone();
|
|
||||||
let cmd_clone = command.clone();
|
|
||||||
let seq_clone = sequence;
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = Self::send_command_to_node(&node_addr, &cmd_clone, seq_clone).await {
|
|
||||||
eprintln!("Failed to replicate to {}: {}", node_addr, e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Отправка команды на удаленный узел
|
|
||||||
async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> {
|
|
||||||
let mut stream = match tokio::net::TcpStream::connect(node).await {
|
|
||||||
Ok(stream) => stream,
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("Failed to connect to {}: {}", node, e);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let message = protocol::ReplicationMessage {
|
|
||||||
sequence,
|
|
||||||
command: command.clone(),
|
|
||||||
timestamp: chrono::Utc::now().timestamp(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let bytes = protocol::serialize(&message)?;
|
|
||||||
|
|
||||||
if let Err(e) = stream.write_all(&bytes).await {
|
|
||||||
eprintln!("Failed to send command to {}: {}", node, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Синхронизация с другими узлами
|
|
||||||
async fn sync_with_nodes(&self) {
|
|
||||||
println!("Starting sync with {} nodes", self.nodes.len());
|
|
||||||
for entry in self.nodes.iter() {
|
|
||||||
let node_addr = entry.value().address.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = Self::sync_with_node(&node_addr).await {
|
|
||||||
eprintln!("Failed to sync with {}: {}", node_addr, e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Синхронизация с удаленным узлом
|
|
||||||
async fn sync_with_node(_node: &str) -> Result<()> {
|
|
||||||
// TODO: Реализовать wait-free синхронизацию данных
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Отправка heartbeat
|
|
||||||
async fn send_heartbeat(&self) -> Result<()> {
|
|
||||||
for entry in self.nodes.iter() {
|
|
||||||
let node = entry.value();
|
|
||||||
if node.raft_info.state == RaftState::Follower {
|
|
||||||
let node_addr = node.address.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = Self::send_heartbeat_to_node(&node_addr).await {
|
|
||||||
eprintln!("Heartbeat failed for {}: {}", node_addr, e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Отправка heartbeat на удаленный узел
|
|
||||||
async fn send_heartbeat_to_node(node: &str) -> Result<()> {
|
|
||||||
let mut stream = match tokio::net::TcpStream::connect(node).await {
|
|
||||||
Ok(stream) => stream,
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("Failed to connect to {} for heartbeat: {}", node, e);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let heartbeat = protocol::ReplicationMessage {
|
|
||||||
sequence: 0,
|
|
||||||
command: protocol::Command::CallProcedure { name: "heartbeat".to_string() },
|
|
||||||
timestamp: chrono::Utc::now().timestamp(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let bytes = protocol::serialize(&heartbeat)?;
|
|
||||||
|
|
||||||
if let Err(e) = stream.write_all(&bytes).await {
|
|
||||||
eprintln!("Failed to send heartbeat to {}: {}", node, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Raft методы
|
|
||||||
/// Обработка запроса голоса
|
|
||||||
async fn handle_vote_request(&self, term: u64, candidate_id: String) {
|
|
||||||
let current_term = self.current_term.load(Ordering::SeqCst);
|
|
||||||
|
|
||||||
if term > current_term {
|
|
||||||
self.current_term.store(term, Ordering::SeqCst);
|
|
||||||
self.voted_for.insert(term, candidate_id.clone());
|
|
||||||
// TODO: Отправить положительный ответ
|
|
||||||
}
|
|
||||||
// TODO: Отправить отрицательный ответ если условия не выполнены
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Обработка ответа голоса
|
|
||||||
async fn handle_vote_response(&self, term: u64, vote_granted: bool) {
|
|
||||||
if vote_granted && term == self.current_term.load(Ordering::SeqCst) {
|
|
||||||
// TODO: Подсчитать голоса и перейти в лидеры при большинстве
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Обработка AppendEntries RPC
|
|
||||||
async fn handle_append_entries(&self, term: u64, leader_id: String) {
|
|
||||||
let current_term = self.current_term.load(Ordering::SeqCst);
|
|
||||||
|
|
||||||
if term >= current_term {
|
|
||||||
self.current_term.store(term, Ordering::SeqCst);
|
|
||||||
self.is_leader.store(false, Ordering::SeqCst);
|
|
||||||
// TODO: Обновить состояние follower
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Шардинг методы
|
|
||||||
/// Добавление шард-узла с Raft информацией
|
|
||||||
pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> {
|
|
||||||
let raft_node = RaftNode {
|
|
||||||
node_id: node_id.clone(),
|
|
||||||
address: address.clone(),
|
|
||||||
state: RaftState::Follower,
|
|
||||||
term: 0,
|
|
||||||
voted_for: None,
|
|
||||||
last_heartbeat: chrono::Utc::now().timestamp(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let node = ShardNode {
|
|
||||||
node_id: node_id.clone(),
|
|
||||||
address,
|
|
||||||
capacity,
|
|
||||||
used: 0,
|
|
||||||
collections: Vec::new(),
|
|
||||||
raft_info: raft_node,
|
|
||||||
};
|
|
||||||
|
|
||||||
self.nodes.insert(node_id, node);
|
|
||||||
|
|
||||||
// Проверяем сформированность кластера (минимум 2 узла для шардинга)
|
|
||||||
if self.nodes.len() >= 2 {
|
|
||||||
self.cluster_formed.store(true, Ordering::SeqCst);
|
|
||||||
println!("Cluster formed with {} nodes", self.nodes.len());
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Удаление шард-узла
|
|
||||||
pub fn remove_node(&self, node_id: &str) -> Result<()> {
|
|
||||||
self.nodes.remove(node_id);
|
|
||||||
|
|
||||||
// Проверяем сформированность кластера после удаления
|
|
||||||
if self.nodes.len() < 2 {
|
|
||||||
self.cluster_formed.store(false, Ordering::SeqCst);
|
|
||||||
self.is_leader.store(false, Ordering::SeqCst);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Настройка шардинга для коллекции
|
|
||||||
pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> {
|
|
||||||
// Проверка наличия кластера перед настройкой шардинга
|
|
||||||
if !self.cluster_formed.load(Ordering::SeqCst) {
|
|
||||||
return Err(crate::common::FutriixError::DatabaseError(
|
|
||||||
"Cannot setup sharding: cluster not formed. Need at least 2 nodes.".to_string()
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
let sharding = CollectionSharding {
|
|
||||||
shard_key: shard_key.to_string(),
|
|
||||||
virtual_nodes: self.virtual_nodes_per_node,
|
|
||||||
ring: BTreeMap::new(),
|
|
||||||
};
|
|
||||||
|
|
||||||
self.collections.insert(collection.to_string(), sharding);
|
|
||||||
self.rebuild_ring(collection)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Перестроение хэш-ринга для коллекции
|
|
||||||
fn rebuild_ring(&self, collection: &str) -> Result<()> {
|
|
||||||
if let Some(mut sharding) = self.collections.get_mut(collection) {
|
|
||||||
sharding.ring.clear();
|
|
||||||
|
|
||||||
for entry in self.nodes.iter() {
|
|
||||||
let node_id = entry.key();
|
|
||||||
for i in 0..sharding.virtual_nodes {
|
|
||||||
let key = format!("{}-{}", node_id, i);
|
|
||||||
let hash = self.hash_key(&key);
|
|
||||||
sharding.ring.insert(hash, node_id.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Хэширование ключа
|
|
||||||
fn hash_key(&self, key: &str) -> u64 {
|
|
||||||
let mut hasher = SipHasher13::new();
|
|
||||||
key.hash(&mut hasher);
|
|
||||||
hasher.finish()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Поиск узла для ключа
|
|
||||||
pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result<Option<String>> {
|
|
||||||
// Проверка наличия кластера перед поиском узла
|
|
||||||
if !self.cluster_formed.load(Ordering::SeqCst) {
|
|
||||||
return Err(crate::common::FutriixError::DatabaseError(
|
|
||||||
"Cannot find node: cluster not formed. Need at least 2 nodes.".to_string()
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(sharding) = self.collections.get(collection) {
|
|
||||||
let key_hash = self.hash_key(key_value);
|
|
||||||
|
|
||||||
// Поиск в хэш-ринге (консистентное хэширование)
|
|
||||||
let mut range = sharding.ring.range(key_hash..);
|
|
||||||
if let Some((_, node_id)) = range.next() {
|
|
||||||
return Ok(Some(node_id.clone()));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Если не найдено в верхней части ринга, берем первый узел
|
|
||||||
if let Some((_, node_id)) = sharding.ring.iter().next() {
|
|
||||||
return Ok(Some(node_id.clone()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Миграция шарда
|
|
||||||
pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> {
|
|
||||||
// Проверка наличия кластера перед миграцией
|
|
||||||
if !self.cluster_formed.load(Ordering::SeqCst) {
|
|
||||||
return Err(crate::common::FutriixError::DatabaseError(
|
|
||||||
"Cannot migrate shard: cluster not formed. Need at least 2 nodes.".to_string()
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("Migrating shard for collection '{}' from {} to {} with key {}",
|
|
||||||
collection, from_node, to_node, shard_key);
|
|
||||||
|
|
||||||
self.rebuild_ring(collection)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Ребалансировка кластера
|
|
||||||
pub fn rebalance_cluster(&self) -> Result<()> {
|
|
||||||
// Проверка наличия кластера перед ребалансировкой
|
|
||||||
if !self.cluster_formed.load(Ordering::SeqCst) {
|
|
||||||
return Err(crate::common::FutriixError::DatabaseError(
|
|
||||||
"Cannot rebalance cluster: cluster not formed. Need at least 2 nodes.".to_string()
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("Rebalancing cluster with {} nodes", self.nodes.len());
|
|
||||||
|
|
||||||
// Перестраиваем все хэш-ринги
|
|
||||||
for mut entry in self.collections.iter_mut() {
|
|
||||||
let sharding = entry.value_mut();
|
|
||||||
sharding.ring.clear();
|
|
||||||
|
|
||||||
for node_entry in self.nodes.iter() {
|
|
||||||
let node_id = node_entry.key();
|
|
||||||
for i in 0..sharding.virtual_nodes {
|
|
||||||
let key = format!("{}-{}", node_id, i);
|
|
||||||
let hash = self.hash_key(&key);
|
|
||||||
sharding.ring.insert(hash, node_id.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ребалансировка узлов кластера
|
|
||||||
self.rebalance_nodes()?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Ребалансировка узлов кластера
|
|
||||||
fn rebalance_nodes(&self) -> Result<()> {
|
|
||||||
println!("Rebalancing nodes in cluster...");
|
|
||||||
|
|
||||||
// Рассчитываем среднюю загрузку
|
|
||||||
let total_capacity: u64 = self.nodes.iter().map(|entry| entry.value().capacity).sum();
|
|
||||||
let total_used: u64 = self.nodes.iter().map(|entry| entry.value().used).sum();
|
|
||||||
let avg_usage = if total_capacity > 0 { total_used as f64 / total_capacity as f64 } else { 0.0 };
|
|
||||||
|
|
||||||
println!("Cluster usage: {:.2}% ({} / {})", avg_usage * 100.0, total_used, total_capacity);
|
|
||||||
|
|
||||||
// TODO: Реализовать алгоритм ребалансировки узлов
|
|
||||||
// - Определить перегруженные и недогруженные узлы
|
|
||||||
// - Перераспределить данные между узлами
|
|
||||||
// - Обновить метаданные шардинга
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Получение статуса кластера
|
|
||||||
pub fn get_cluster_status(&self) -> Result<protocol::ClusterStatus> {
|
|
||||||
let mut cluster_nodes = Vec::new();
|
|
||||||
let mut total_capacity = 0;
|
|
||||||
let mut total_used = 0;
|
|
||||||
let mut raft_nodes = Vec::new();
|
|
||||||
|
|
||||||
for entry in self.nodes.iter() {
|
|
||||||
let node = entry.value();
|
|
||||||
total_capacity += node.capacity;
|
|
||||||
total_used += node.used;
|
|
||||||
|
|
||||||
cluster_nodes.push(protocol::ShardInfo {
|
|
||||||
node_id: node.node_id.clone(),
|
|
||||||
address: node.address.clone(),
|
|
||||||
capacity: node.capacity,
|
|
||||||
used: node.used,
|
|
||||||
collections: node.collections.clone(),
|
|
||||||
});
|
|
||||||
|
|
||||||
raft_nodes.push(protocol::RaftNodeInfo {
|
|
||||||
node_id: node.node_id.clone(),
|
|
||||||
address: node.address.clone(),
|
|
||||||
state: match node.raft_info.state {
|
|
||||||
RaftState::Leader => "leader".to_string(),
|
|
||||||
RaftState::Follower => "follower".to_string(),
|
|
||||||
RaftState::Candidate => "candidate".to_string(),
|
|
||||||
},
|
|
||||||
term: node.raft_info.term,
|
|
||||||
last_heartbeat: node.raft_info.last_heartbeat,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(protocol::ClusterStatus {
|
|
||||||
nodes: cluster_nodes,
|
|
||||||
total_capacity,
|
|
||||||
total_used,
|
|
||||||
rebalance_needed: false,
|
|
||||||
cluster_formed: self.cluster_formed.load(Ordering::SeqCst),
|
|
||||||
leader_exists: self.is_leader.load(Ordering::SeqCst),
|
|
||||||
raft_nodes,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Получение списка Raft узлов
|
|
||||||
pub fn get_raft_nodes(&self) -> Vec<RaftNode> {
|
|
||||||
self.nodes.iter()
|
|
||||||
.map(|entry| entry.value().raft_info.clone())
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Проверка сформированности кластера
|
|
||||||
pub fn is_cluster_formed(&self) -> bool {
|
|
||||||
self.cluster_formed.load(Ordering::SeqCst)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Raft выборы - начало кампании
|
|
||||||
pub fn start_election(&self) -> Result<()> {
|
|
||||||
if !self.cluster_formed.load(Ordering::SeqCst) {
|
|
||||||
return Err(crate::common::FutriixError::DatabaseError(
|
|
||||||
"Cluster not formed. Need at least 2 nodes.".to_string()
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
let new_term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1;
|
|
||||||
println!("Starting election for term {}", new_term);
|
|
||||||
|
|
||||||
// Переход в состояние candidate
|
|
||||||
self.is_leader.store(false, Ordering::SeqCst);
|
|
||||||
|
|
||||||
// TODO: Реализовать полную логику Raft выборов
|
|
||||||
// - Отправка RequestVote RPC на другие узлы
|
|
||||||
// - Сбор голосов
|
|
||||||
// - Переход в состояние Leader при получении большинства
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Отправка команды на репликацию
|
|
||||||
pub async fn replicate(&self, command: protocol::Command) -> Result<()> {
|
|
||||||
if !self.replication_enabled.load(Ordering::SeqCst) {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
self.replication_tx.send(ReplicationEvent::Command(command)).await
|
|
||||||
.map_err(|e| crate::common::FutriixError::ReplicationError(e.to_string()))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Запрос синхронизации с другими узлами
|
|
||||||
pub async fn request_sync(&self) -> Result<()> {
|
|
||||||
if !self.replication_enabled.load(Ordering::SeqCst) {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
self.replication_tx.send(ReplicationEvent::SyncRequest).await
|
|
||||||
.map_err(|e| crate::common::FutriixError::ReplicationError(e.to_string()))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Получение списка узлов репликации
|
|
||||||
pub fn get_nodes(&self) -> Vec<ShardNode> {
|
|
||||||
self.nodes.iter()
|
|
||||||
.map(|entry| entry.value().clone())
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Получение текущего номера последовательности
|
|
||||||
pub fn get_sequence_number(&self) -> u64 {
|
|
||||||
self.sequence_number.load(Ordering::SeqCst)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Проверка, включена ли репликация
|
|
||||||
pub fn is_replication_enabled(&self) -> bool {
|
|
||||||
self.replication_enabled.load(Ordering::SeqCst)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Получение информации об узле
|
|
||||||
pub fn get_node(&self, node_id: &str) -> Option<ShardNode> {
|
|
||||||
self.nodes.get(node_id).map(|entry| entry.value().clone())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user