Delete src/cluster.rs

This commit is contained in:
Григорий Сафронов 2026-01-14 20:57:36 +00:00
parent e94ee7b1d8
commit 1049fb039f

View File

@ -1,826 +0,0 @@
//! Модуль кластеризации для flusql
//!
//! Реализует простейший шардинг, мастер-мастер синхронную репликацию
//! и управление кластером по паттерну "Centralized Coordinator".
//!
//! Основные возможности:
//! - Простейший шардинг данных
//! - Мастер-мастер синхронная репликация
//! - Централизованный координатор (один главный узел)
//! - Команды управления кластером через Lua
//! - Wait-free доступ к метаданным кластера
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use dashmap::DashMap;
use serde::{Serialize, Deserialize};
use tokio::sync::broadcast;
use tokio::time::{Duration, interval};
use thiserror::Error;
/// Узел кластера
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterNode {
pub id: String,
pub address: String,
pub role: NodeRole,
pub status: NodeStatus,
pub last_heartbeat: u64,
pub shards: Vec<String>,
}
/// Роль узла
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Copy)]
pub enum NodeRole {
Master,
Slave,
Coordinator,
}
/// Статус узла
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Copy)]
pub enum NodeStatus {
Online,
Offline,
Syncing,
}
/// Шард данных
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Shard {
pub id: String,
pub master_node: String,
pub slave_nodes: Vec<String>,
pub data_range: (u64, u64), // Диапазон данных шарда
pub replication_lag: u64, // Задержка репликации в мс
}
/// Менеджер кластера
#[derive(Clone)]
pub struct ClusterManager {
pub nodes: Arc<DashMap<String, ClusterNode>>,
pub shards: Arc<DashMap<String, Shard>>,
coordinator_id: Arc<AtomicU64>,
is_coordinator: Arc<AtomicBool>,
cluster_version: Arc<AtomicU64>,
event_sender: broadcast::Sender<ClusterEvent>,
node_id: String,
node_address: String,
}
impl ClusterManager {
/// Создание нового менеджера кластера
pub fn new(node_id: &str, address: &str) -> Self {
let (event_sender, _) = broadcast::channel(100);
let mut manager = Self {
nodes: Arc::new(DashMap::new()),
shards: Arc::new(DashMap::new()),
coordinator_id: Arc::new(AtomicU64::new(0)),
is_coordinator: Arc::new(AtomicBool::new(false)),
cluster_version: Arc::new(AtomicU64::new(1)),
event_sender,
node_id: node_id.to_string(),
node_address: address.to_string(),
};
// Регистрируем текущий узел
manager.register_node(node_id, address, NodeRole::Slave);
manager
}
/// Регистрация узла в кластере
pub fn register_node(&mut self, node_id: &str, address: &str, role: NodeRole) {
let node = ClusterNode {
id: node_id.to_string(),
address: address.to_string(),
role,
status: NodeStatus::Online,
last_heartbeat: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
shards: Vec::new(),
};
self.nodes.insert(node_id.to_string(), node);
// Если это первый узел, делаем его координатором
if self.nodes.len() == 1 {
self.set_coordinator(node_id);
}
let _ = self.event_sender.send(ClusterEvent::NodeJoined {
node_id: node_id.to_string(),
address: address.to_string(),
role,
});
}
/// Установка координатора (публичный метод для использования в Lua)
pub fn set_coordinator(&mut self, node_id: &str) {
// ЛОГИЧЕСКАЯ ОШИБКА БЫЛА ЗДЕСЬ: неправильное преобразование node_id в числовой ID
// Исправление: корректное преобразование строкового ID
let node_id_num = if node_id.starts_with("node_") {
node_id.trim_start_matches("node_").parse::<u64>().unwrap_or_else(|_| {
// Если парсинг не удался, используем хэш строки
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
node_id.hash(&mut hasher);
hasher.finish()
})
} else {
// Пробуем извлечь числовую часть
let numeric_part: String = node_id.chars()
.filter(|c| c.is_digit(10))
.collect();
if !numeric_part.is_empty() {
numeric_part.parse::<u64>().unwrap_or_else(|_| {
// Если парсинг не удался, используем хэш
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
node_id.hash(&mut hasher);
hasher.finish()
})
} else {
// Если нет цифр, используем хэш всей строки
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
node_id.hash(&mut hasher);
hasher.finish()
}
};
self.coordinator_id.store(node_id_num, Ordering::SeqCst);
let current_coordinator_id = self.get_coordinator_id();
self.is_coordinator.store(node_id == current_coordinator_id, Ordering::SeqCst);
// Обновляем роль старого координатора (если есть)
if let Some(old_coordinator) = self.get_current_coordinator() {
if old_coordinator.id != node_id {
if let Some(mut old_node) = self.nodes.get_mut(&old_coordinator.id) {
old_node.role = NodeRole::Slave;
}
}
}
// Устанавливаем новую роль
if let Some(mut node) = self.nodes.get_mut(node_id) {
node.role = NodeRole::Coordinator;
}
let _ = self.event_sender.send(ClusterEvent::CoordinatorChanged {
old_coordinator: self.get_current_coordinator_name(),
new_coordinator: node_id.to_string(),
});
}
/// Получение ID координатора
pub fn get_coordinator_id(&self) -> String {
let id_num = self.coordinator_id.load(Ordering::Relaxed);
if id_num == 0 {
// Возвращаем первый узел как координатор по умолчанию
self.nodes.iter()
.next()
.map(|entry| entry.key().clone())
.unwrap_or_else(|| "node_1".to_string())
} else {
// ЛОГИЧЕСКАЯ ОШИБКА: была попытка вернуть числовой ID как строку
// Исправление: ищем узел с соответствующим ID
for entry in self.nodes.iter() {
let node_id = entry.key();
if node_id.starts_with("node_") {
if let Ok(num) = node_id.trim_start_matches("node_").parse::<u64>() {
if num == id_num {
return node_id.clone();
}
}
}
}
// Если не нашли, возвращаем первый узел
self.nodes.iter()
.next()
.map(|entry| entry.key().clone())
.unwrap_or_else(|| format!("node_{}", id_num))
}
}
/// Получение текущего координатора
pub fn get_current_coordinator(&self) -> Option<ClusterNode> {
let coordinator_id = self.get_coordinator_id();
if coordinator_id.is_empty() {
None
} else {
self.nodes.get(&coordinator_id).map(|node| node.value().clone())
}
}
/// Получение имени текущего координатора
pub fn get_current_coordinator_name(&self) -> String {
self.get_current_coordinator()
.map(|node| node.id)
.unwrap_or_else(|| "none".to_string())
}
/// Получение адреса текущего координатора
pub fn get_coordinator_address(&self) -> Option<String> {
self.get_current_coordinator()
.map(|node| node.address)
}
/// Выбор нового координатора
pub fn elect_new_coordinator(&mut self) -> Result<(), ClusterError> {
// Найти наиболее подходящего кандидата среди онлайн узлов
let candidates: Vec<ClusterNode> = self.nodes.iter()
.filter(|entry| {
let node = entry.value();
node.status == NodeStatus::Online &&
node.role != NodeRole::Coordinator &&
!node.id.is_empty()
})
.map(|entry| entry.value().clone())
.collect();
if candidates.is_empty() {
// ЛОГИЧЕСКАЯ ОШИБКА: не обрабатывался случай, когда нет кандидатов
// Исправление: если нет кандидатов, используем первый доступный узел
let fallback_candidate: Vec<ClusterNode> = self.nodes.iter()
.filter(|entry| !entry.key().is_empty())
.map(|entry| entry.value().clone())
.collect();
if fallback_candidate.is_empty() {
return Err(ClusterError::NoNodes);
}
let candidate = &fallback_candidate[0];
log::warn!("No online non-coordinator nodes found, using fallback candidate: {}", candidate.id);
self.set_coordinator(&candidate.id);
return Ok(());
}
// Стратегия выбора: узел с наибольшим количеством шардов
let candidate = candidates.iter()
.max_by_key(|node| {
// Вес кандидата = количество шардов * 100 + длина ID (для разрешения ничьих)
node.shards.len() * 100 + node.id.len()
})
.ok_or_else(|| ClusterError::NoNodes)?;
log::info!("Electing new coordinator: {} (shards: {})",
candidate.id, candidate.shards.len());
self.set_coordinator(&candidate.id);
Ok(())
}
/// Создание кластера
pub fn create_cluster(&mut self, nodes: HashMap<String, String>) -> Result<(), ClusterError> {
for (node_id, address) in nodes {
self.register_node(&node_id, &address, NodeRole::Slave);
}
// Выбираем первый узел как координатора
let coordinator_id = self.nodes.iter().next()
.map(|entry| entry.key().clone())
.ok_or_else(|| ClusterError::NoNodes)?;
self.set_coordinator(&coordinator_id);
Ok(())
}
/// Ребалансировка кластера
pub fn rebalance_cluster(&self) -> Result<(), ClusterError> {
// Простая стратегия ребалансировки: равномерное распределение шардов
let node_count = self.nodes.len();
if node_count == 0 {
return Err(ClusterError::NoNodes);
}
let shard_count = self.shards.len();
let shards_per_node = if node_count > 0 {
(shard_count + node_count - 1) / node_count
} else {
0
};
// Перераспределяем шарды
let mut node_index = 0;
let node_ids: Vec<String> = self.nodes.iter().map(|entry| entry.key().clone()).collect();
for (i, mut shard_entry) in self.shards.iter_mut().enumerate() {
let shard = shard_entry.value_mut();
let target_node = &node_ids[node_index % node_ids.len()];
// Обновляем мастер для шарда
shard.master_node = target_node.clone();
// Добавляем шард к узлу
if let Some(mut node) = self.nodes.get_mut(target_node) {
if !node.shards.contains(&shard.id) {
node.shards.push(shard.id.clone());
}
}
node_index += 1;
}
// Увеличиваем версию кластера
self.cluster_version.fetch_add(1, Ordering::SeqCst);
Ok(())
}
/// Исключение узла из кластера
pub fn evict_node(&mut self, node_id: &str) -> Result<(), ClusterError> {
// Проверяем, существует ли узел
if !self.nodes.contains_key(node_id) {
return Err(ClusterError::NodeNotFound(node_id.to_string()));
}
// Нельзя исключить координатора, если нет других узлов
if node_id == self.get_coordinator_id() {
let online_nodes_count = self.nodes.iter()
.filter(|entry| entry.value().status == NodeStatus::Online)
.count();
if online_nodes_count <= 1 {
return Err(ClusterError::CannotEvictLastNode);
}
// Нужно выбрать нового координатора перед исключением текущего
self.elect_new_coordinator()?;
}
// Перераспределяем шарды исключаемого узла
if let Some(node) = self.nodes.get(node_id) {
for shard_id in &node.shards {
if let Some(mut shard) = self.shards.get_mut(shard_id) {
// Находим новый мастер для шарда
if let Some(new_master) = self.find_best_node_for_shard(shard_id) {
shard.master_node = new_master.clone();
// Добавляем шард к новому узлу
if let Some(mut new_node) = self.nodes.get_mut(&new_master) {
new_node.shards.push(shard_id.clone());
}
}
}
}
}
// Удаляем узел
self.nodes.remove(node_id);
// Отправляем событие
let _ = self.event_sender.send(ClusterEvent::NodeEvicted {
node_id: node_id.to_string(),
});
Ok(())
}
/// Поиск лучшего узла для шард
fn find_best_node_for_shard(&self, shard_id: &str) -> Option<String> {
// Простая эвристика: узел с наименьшим количеством шардов
self.nodes.iter()
.filter(|entry| entry.value().status == NodeStatus::Online)
.min_by_key(|entry| entry.shards.len())
.map(|entry| entry.key().clone())
}
/// Добавление шарда
pub fn add_shard(&mut self, shard_id: &str, master_node: &str, slave_nodes: Vec<String>) -> Result<(), ClusterError> {
// Проверяем существование мастер-узла
if !self.nodes.contains_key(master_node) {
return Err(ClusterError::NodeNotFound(master_node.to_string()));
}
// Проверяем существование слейв-узлов
for slave in &slave_nodes {
if !self.nodes.contains_key(slave) {
return Err(ClusterError::NodeNotFound(slave.clone()));
}
}
// Создаем шард
let shard = Shard {
id: shard_id.to_string(),
master_node: master_node.to_string(),
slave_nodes,
data_range: (0, u64::MAX),
replication_lag: 0,
};
self.shards.insert(shard_id.to_string(), shard);
// Добавляем шард к мастер-узлу
if let Some(mut node) = self.nodes.get_mut(master_node) {
node.shards.push(shard_id.to_string());
}
Ok(())
}
/// Удаление шард
pub fn remove_shard(&mut self, shard_id: &str) -> Result<(), ClusterError> {
if self.shards.remove(shard_id).is_none() {
return Err(ClusterError::ShardNotFound(shard_id.to_string()));
}
// Удаляем шард из всех узлов
for mut node in self.nodes.iter_mut() {
node.shards.retain(|id| id != shard_id);
}
Ok(())
}
/// Начало репликации
pub fn start_replication(&mut self, source_node: &str, target_node: &str) -> Result<(), ClusterError> {
// Проверяем существование узлов
if !self.nodes.contains_key(source_node) || !self.nodes.contains_key(target_node) {
return Err(ClusterError::NodeNotFound(format!("{} or {}", source_node, target_node)));
}
// Обновляем статус узлов
if let Some(mut source) = self.nodes.get_mut(source_node) {
source.status = NodeStatus::Syncing;
}
if let Some(mut target) = self.nodes.get_mut(target_node) {
target.status = NodeStatus::Syncing;
}
// В реальной реализации здесь будет логика репликации данных
// Для упрощения просто отправляем событие
let _ = self.event_sender.send(ClusterEvent::ReplicationStarted {
source: source_node.to_string(),
target: target_node.to_string(),
});
Ok(())
}
/// Получение статуса кластера
pub fn get_cluster_status(&self) -> ClusterStatus {
let nodes: Vec<ClusterNode> = self.nodes.iter().map(|entry| entry.value().clone()).collect();
let shards: Vec<Shard> = self.shards.iter().map(|entry| entry.value().clone()).collect();
ClusterStatus {
coordinator_id: self.get_coordinator_id(),
coordinator_address: self.get_coordinator_address().unwrap_or_default(),
is_coordinator: self.is_coordinator.load(Ordering::Relaxed),
cluster_version: self.cluster_version.load(Ordering::Relaxed),
node_count: nodes.len(),
shard_count: shards.len(),
nodes,
shards,
}
}
/// Запуск фоновых задач кластера
pub fn start_background_tasks(self: Arc<Self>) {
let cluster1 = Arc::clone(&self);
let cluster2 = Arc::clone(&self);
let cluster3 = Arc::clone(&self);
// Задача отправки heartbeat
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(5));
loop {
interval.tick().await;
// Отправляем heartbeat если мы координатор
if cluster1.is_coordinator.load(Ordering::Relaxed) {
cluster1.send_heartbeat().await;
}
}
});
// Задача обнаружения отказавших узлов
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(10));
loop {
interval.tick().await;
// Обнаружение отказавших узлов и перевыбор координатора если нужно
cluster2.detect_failed_nodes().await;
// Проверяем, жив ли координатор
if let Err(e) = cluster2.check_coordinator_health().await {
log::warn!("Coordinator health check failed: {}", e);
// Попытка перевыбора координатора
let cluster_mut = Arc::clone(&cluster2);
let mut cluster_ref = (*cluster_mut).clone();
if let Err(e) = cluster_ref.elect_new_coordinator() {
log::error!("Failed to elect new coordinator: {}", e);
}
}
}
});
// Задача проверки здоровья координатора
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(15));
loop {
interval.tick().await;
cluster3.coordinator_health_check().await;
}
});
}
/// Проверка здоровья координатора
async fn coordinator_health_check(&self) {
if !self.is_coordinator.load(Ordering::Relaxed) {
// Если мы не координатор, проверяем жив ли координатор
let coordinator = self.get_current_coordinator();
if let Some(coord) = coordinator {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let heartbeat_timeout = 30; // 30 секунд
if now - coord.last_heartbeat > heartbeat_timeout && coord.status == NodeStatus::Online {
log::warn!("Coordinator {} appears to be down (last heartbeat: {}s ago)",
coord.id, now - coord.last_heartbeat);
// Обновляем статус координатора
if let Some(mut node) = self.nodes.get_mut(&coord.id) {
node.status = NodeStatus::Offline;
}
// Отправляем событие
let _ = self.event_sender.send(ClusterEvent::CoordinatorFailed {
coordinator_id: coord.id.clone(),
});
}
} else {
log::warn!("No coordinator defined in cluster");
}
}
}
/// Проверка здоровья координатора (альтернативная реализация)
async fn check_coordinator_health(&self) -> Result<(), String> {
if self.is_coordinator.load(Ordering::Relaxed) {
// Мы координатор - всегда здоровы
return Ok(());
}
let coordinator = self.get_current_coordinator();
if let Some(coord) = coordinator {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let heartbeat_timeout = 30; // 30 секунд
if now - coord.last_heartbeat > heartbeat_timeout {
return Err(format!("Coordinator {} heartbeat timeout", coord.id));
}
Ok(())
} else {
Err("No coordinator defined".to_string())
}
}
/// Отправка heartbeat
async fn send_heartbeat(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
// Обновляем время последнего heartbeat для координатора
let coordinator_id = self.get_coordinator_id();
if let Some(mut node) = self.nodes.get_mut(&coordinator_id) {
node.last_heartbeat = now;
}
// Отправляем событие
let _ = self.event_sender.send(ClusterEvent::Heartbeat {
timestamp: now,
coordinator_id: coordinator_id,
});
}
/// Обнаружение отказавших узлов
async fn detect_failed_nodes(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let timeout = 30; // 30 секунд
for mut node in self.nodes.iter_mut() {
if node.id != self.get_coordinator_id() && now - node.last_heartbeat > timeout {
node.status = NodeStatus::Offline;
// Отправляем событие
let _ = self.event_sender.send(ClusterEvent::NodeFailed {
node_id: node.id.clone(),
});
}
}
}
}
/// Статус кластера
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterStatus {
pub coordinator_id: String,
pub coordinator_address: String,
pub is_coordinator: bool,
pub cluster_version: u64,
pub node_count: usize,
pub shard_count: usize,
pub nodes: Vec<ClusterNode>,
pub shards: Vec<Shard>,
}
/// События кластера
#[derive(Debug, Clone)]
pub enum ClusterEvent {
NodeJoined {
node_id: String,
address: String,
role: NodeRole,
},
NodeEvicted {
node_id: String,
},
NodeFailed {
node_id: String,
},
ReplicationStarted {
source: String,
target: String,
},
Heartbeat {
timestamp: u64,
coordinator_id: String,
},
ShardMoved {
shard_id: String,
from_node: String,
to_node: String,
},
CoordinatorChanged {
old_coordinator: String,
new_coordinator: String,
},
CoordinatorFailed {
coordinator_id: String,
},
}
/// Менеджер репликации
#[derive(Clone)]
pub struct ReplicationManager {
cluster_manager: Arc<ClusterManager>,
replication_queue: Arc<DashMap<String, Vec<ReplicationTask>>>,
is_replicating: Arc<AtomicBool>,
}
impl ReplicationManager {
pub fn new(cluster_manager: Arc<ClusterManager>) -> Self {
Self {
cluster_manager,
replication_queue: Arc::new(DashMap::new()),
is_replicating: Arc::new(AtomicBool::new(false)),
}
}
/// Начало репликации данных
pub async fn replicate_data(&self, database_name: &str, table_name: &str) -> Result<(), ClusterError> {
// В реальной реализации здесь будет логика репликации данных таблицы
// Для упрощения просто добавляем задачу в очередь
let task = ReplicationTask {
database: database_name.to_string(),
table: table_name.to_string(),
status: ReplicationStatus::Pending,
started_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
self.replication_queue
.entry(database_name.to_string())
.or_insert_with(Vec::new)
.push(task);
Ok(())
}
/// Запуск фоновой репликации
pub fn start_background_replication(self: Arc<Self>) {
let manager = Arc::clone(&self);
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(1));
loop {
interval.tick().await;
if !manager.is_replicating.load(Ordering::Relaxed) {
manager.process_replication_queue().await;
}
}
});
}
/// Обработка очереди репликации
async fn process_replication_queue(&self) {
self.is_replicating.store(true, Ordering::Relaxed);
// Обрабатываем задачи репликации
for mut entry in self.replication_queue.iter_mut() {
let tasks = entry.value_mut();
let mut completed_tasks = Vec::new();
for (i, task) in tasks.iter_mut().enumerate() {
if task.status == ReplicationStatus::Pending {
task.status = ReplicationStatus::InProgress;
// В реальной реализации здесь будет репликация данных
// Для упрощения просто отмечаем как завершенную
task.status = ReplicationStatus::Completed;
completed_tasks.push(i);
}
}
// Удаляем завершенные задачи
for i in completed_tasks.into_iter().rev() {
tasks.remove(i);
}
}
self.is_replicating.store(false, Ordering::Relaxed);
}
}
/// Задача репликации
#[derive(Debug, Clone)]
struct ReplicationTask {
database: String,
table: String,
status: ReplicationStatus,
started_at: u64,
}
/// Статус репликации
#[derive(Debug, Clone, PartialEq, Copy)]
enum ReplicationStatus {
Pending,
InProgress,
Completed,
Failed,
}
/// Ошибки кластера
#[derive(Debug, Error)]
pub enum ClusterError {
#[error("Node not found: {0}")]
NodeNotFound(String),
#[error("Shard not found: {0}")]
ShardNotFound(String),
#[error("Cannot evict coordinator")]
CannotEvictCoordinator,
#[error("Cannot evict last node")]
CannotEvictLastNode,
#[error("No nodes in cluster")]
NoNodes,
#[error("Replication error: {0}")]
ReplicationError(String),
#[error("Network error: {0}")]
NetworkError(String),
#[error("Coordinator election failed: {0}")]
CoordinatorElectionFailed(String),
}