flusql/src/cluster.rs

790 lines
29 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Модуль кластеризации для flusql
//!
//! Реализует простейший шардинг, мастер-мастер синхронную репликацию
//! и управление кластером по паттерну "Centralized Coordinator".
//!
//! Основные возможности:
//! - Простейший шардинг данных
//! - Мастер-мастер синхронная репликация
//! - Централизованный координатор (один главный узел)
//! - Команды управления кластером через Lua
//! - Wait-free доступ к метаданным кластера
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use dashmap::DashMap;
use serde::{Serialize, Deserialize};
use tokio::sync::broadcast;
use tokio::time::{Duration, interval};
use thiserror::Error;
/// Узел кластера
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterNode {
pub id: String,
pub address: String,
pub role: NodeRole,
pub status: NodeStatus,
pub last_heartbeat: u64,
pub shards: Vec<String>,
}
/// Роль узла
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Copy)]
pub enum NodeRole {
Master,
Slave,
Coordinator,
}
/// Статус узла
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Copy)]
pub enum NodeStatus {
Online,
Offline,
Syncing,
}
/// Шард данных
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Shard {
pub id: String,
pub master_node: String,
pub slave_nodes: Vec<String>,
pub data_range: (u64, u64), // Диапазон данных шарда
pub replication_lag: u64, // Задержка репликации в мс
}
/// Менеджер кластера
#[derive(Clone)]
pub struct ClusterManager {
pub nodes: Arc<DashMap<String, ClusterNode>>,
pub shards: Arc<DashMap<String, Shard>>,
coordinator_id: Arc<AtomicU64>,
is_coordinator: Arc<AtomicBool>,
cluster_version: Arc<AtomicU64>,
event_sender: broadcast::Sender<ClusterEvent>,
node_id: String,
node_address: String,
}
impl ClusterManager {
/// Создание нового менеджера кластера
pub fn new(node_id: &str, address: &str) -> Self {
let (event_sender, _) = broadcast::channel(100);
let mut manager = Self {
nodes: Arc::new(DashMap::new()),
shards: Arc::new(DashMap::new()),
coordinator_id: Arc::new(AtomicU64::new(0)),
is_coordinator: Arc::new(AtomicBool::new(false)),
cluster_version: Arc::new(AtomicU64::new(1)),
event_sender,
node_id: node_id.to_string(),
node_address: address.to_string(),
};
// Регистрируем текущий узел
manager.register_node(node_id, address, NodeRole::Slave);
manager
}
/// Регистрация узла в кластере
pub fn register_node(&mut self, node_id: &str, address: &str, role: NodeRole) {
let node = ClusterNode {
id: node_id.to_string(),
address: address.to_string(),
role,
status: NodeStatus::Online,
last_heartbeat: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
shards: Vec::new(),
};
self.nodes.insert(node_id.to_string(), node);
// Если это первый узел, делаем его координатором
if self.nodes.len() == 1 {
self.set_coordinator(node_id);
}
let _ = self.event_sender.send(ClusterEvent::NodeJoined {
node_id: node_id.to_string(),
address: address.to_string(),
role,
});
}
/// Установка координатора (публичный метод для использования в Lua)
pub fn set_coordinator(&mut self, node_id: &str) {
// Исправление: храним строковый ID координатора, а не числовой
// Преобразуем строку в хэш для внутреннего использования
let coordinator_hash = self.hash_node_id(node_id);
self.coordinator_id.store(coordinator_hash, Ordering::SeqCst);
// Обновляем флаг is_coordinator для текущего узла
let is_current_node_coordinator = node_id == self.node_id;
self.is_coordinator.store(is_current_node_coordinator, Ordering::SeqCst);
// Обновляем роль старого координатора (если есть)
if let Some(old_coordinator) = self.get_current_coordinator() {
if old_coordinator.id != node_id {
if let Some(mut old_node) = self.nodes.get_mut(&old_coordinator.id) {
old_node.role = NodeRole::Slave;
}
}
}
// Устанавливаем новую роль
if let Some(mut node) = self.nodes.get_mut(node_id) {
node.role = NodeRole::Coordinator;
}
let _ = self.event_sender.send(ClusterEvent::CoordinatorChanged {
old_coordinator: self.get_current_coordinator_name(),
new_coordinator: node_id.to_string(),
});
}
/// Хэширование ID узла для внутреннего использования
fn hash_node_id(&self, node_id: &str) -> u64 {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
node_id.hash(&mut hasher);
hasher.finish()
}
/// Получение ID координатора (строковый идентификатор)
pub fn get_coordinator_id(&self) -> String {
// Возвращаем имя узла-координатора
self.get_current_coordinator_name()
}
/// Получение текущего координатора
pub fn get_current_coordinator(&self) -> Option<ClusterNode> {
// Ищем узел с ролью Coordinator
for entry in self.nodes.iter() {
let node = entry.value();
if node.role == NodeRole::Coordinator {
return Some(node.clone());
}
}
// Если не нашли координатора, выбираем первый доступный узел
self.nodes.iter()
.next()
.map(|entry| entry.value().clone())
}
/// Получение имени текущего координатора
pub fn get_current_coordinator_name(&self) -> String {
self.get_current_coordinator()
.map(|node| node.id)
.unwrap_or_else(|| "none".to_string())
}
/// Получение адреса текущего координатора
pub fn get_coordinator_address(&self) -> Option<String> {
self.get_current_coordinator()
.map(|node| node.address)
}
/// Выбор нового координатора
pub fn elect_new_coordinator(&mut self) -> Result<(), ClusterError> {
// Найти наиболее подходящего кандидата среди онлайн узлов
let candidates: Vec<ClusterNode> = self.nodes.iter()
.filter(|entry| {
let node = entry.value();
node.status == NodeStatus::Online &&
node.role != NodeRole::Coordinator &&
!node.id.is_empty()
})
.map(|entry| entry.value().clone())
.collect();
if candidates.is_empty() {
// Исправление: обрабатываем случай, когда нет кандидатов
// Ищем любой онлайн узел, включая текущего координатора
let fallback_candidate: Vec<ClusterNode> = self.nodes.iter()
.filter(|entry| {
let node = entry.value();
node.status == NodeStatus::Online && !node.id.is_empty()
})
.map(|entry| entry.value().clone())
.collect();
if fallback_candidate.is_empty() {
return Err(ClusterError::NoNodes);
}
let candidate = &fallback_candidate[0];
log::warn!("No online non-coordinator nodes found, using fallback candidate: {}", candidate.id);
self.set_coordinator(&candidate.id);
return Ok(());
}
// Стратегия выбора: узел с наибольшим количеством шардов
let candidate = candidates.iter()
.max_by_key(|node| {
// Вес кандидата = количество шардов * 100 + длина ID (для разрешения ничьих)
node.shards.len() * 100 + node.id.len()
})
.ok_or_else(|| ClusterError::NoNodes)?;
log::info!("Electing new coordinator: {} (shards: {})",
candidate.id, candidate.shards.len());
self.set_coordinator(&candidate.id);
Ok(())
}
/// Создание кластера
pub fn create_cluster(&mut self, nodes: HashMap<String, String>) -> Result<(), ClusterError> {
for (node_id, address) in nodes {
self.register_node(&node_id, &address, NodeRole::Slave);
}
// Выбираем первый узел как координатора
let coordinator_id = self.nodes.iter().next()
.map(|entry| entry.key().clone())
.ok_or_else(|| ClusterError::NoNodes)?;
self.set_coordinator(&coordinator_id);
Ok(())
}
/// Ребалансировка кластера
pub fn rebalance_cluster(&self) -> Result<(), ClusterError> {
// Простая стратегия ребалансировки: равномерное распределение шардов
let node_count = self.nodes.len();
if node_count == 0 {
return Err(ClusterError::NoNodes);
}
let shard_count = self.shards.len();
let shards_per_node = if node_count > 0 {
(shard_count + node_count - 1) / node_count
} else {
0
};
// Перераспределяем шарды
let mut node_index = 0;
let node_ids: Vec<String> = self.nodes.iter().map(|entry| entry.key().clone()).collect();
for (i, mut shard_entry) in self.shards.iter_mut().enumerate() {
let shard = shard_entry.value_mut();
let target_node = &node_ids[node_index % node_ids.len()];
// Обновляем мастер для шарда
shard.master_node = target_node.clone();
// Добавляем шард к узлу
if let Some(mut node) = self.nodes.get_mut(target_node) {
if !node.shards.contains(&shard.id) {
node.shards.push(shard.id.clone());
}
}
node_index += 1;
}
// Увеличиваем версию кластера
self.cluster_version.fetch_add(1, Ordering::SeqCst);
Ok(())
}
/// Исключение узла из кластера
pub fn evict_node(&mut self, node_id: &str) -> Result<(), ClusterError> {
// Проверяем, существует ли узел
if !self.nodes.contains_key(node_id) {
return Err(ClusterError::NodeNotFound(node_id.to_string()));
}
// Нельзя исключить координатора, если нет других узлов
if node_id == self.get_coordinator_id() {
let online_nodes_count = self.nodes.iter()
.filter(|entry| entry.value().status == NodeStatus::Online)
.count();
if online_nodes_count <= 1 {
return Err(ClusterError::CannotEvictLastNode);
}
// Нужно выбрать нового координатора перед исключением текущего
self.elect_new_coordinator()?;
}
// Перераспределяем шарды исключаемого узла
if let Some(node) = self.nodes.get(node_id) {
for shard_id in &node.shards {
if let Some(mut shard) = self.shards.get_mut(shard_id) {
// Находим новый мастер для шарда
if let Some(new_master) = self.find_best_node_for_shard(shard_id) {
shard.master_node = new_master.clone();
// Добавляем шард к новому узлу
if let Some(mut new_node) = self.nodes.get_mut(&new_master) {
new_node.shards.push(shard_id.clone());
}
}
}
}
}
// Удаляем узел
self.nodes.remove(node_id);
// Отправляем событие
let _ = self.event_sender.send(ClusterEvent::NodeEvicted {
node_id: node_id.to_string(),
});
Ok(())
}
/// Поиск лучшего узла для шард
fn find_best_node_for_shard(&self, shard_id: &str) -> Option<String> {
// Простая эвристика: узел с наименьшим количеством шардов
self.nodes.iter()
.filter(|entry| entry.value().status == NodeStatus::Online)
.min_by_key(|entry| entry.shards.len())
.map(|entry| entry.key().clone())
}
/// Добавление шарда
pub fn add_shard(&mut self, shard_id: &str, master_node: &str, slave_nodes: Vec<String>) -> Result<(), ClusterError> {
// Проверяем существование мастер-узла
if !self.nodes.contains_key(master_node) {
return Err(ClusterError::NodeNotFound(master_node.to_string()));
}
// Проверяем существование слейв-узлов
for slave in &slave_nodes {
if !self.nodes.contains_key(slave) {
return Err(ClusterError::NodeNotFound(slave.clone()));
}
}
// Создаем шард
let shard = Shard {
id: shard_id.to_string(),
master_node: master_node.to_string(),
slave_nodes,
data_range: (0, u64::MAX),
replication_lag: 0,
};
self.shards.insert(shard_id.to_string(), shard);
// Добавляем шард к мастер-узлу
if let Some(mut node) = self.nodes.get_mut(master_node) {
node.shards.push(shard_id.to_string());
}
Ok(())
}
/// Удаление шард
pub fn remove_shard(&mut self, shard_id: &str) -> Result<(), ClusterError> {
if self.shards.remove(shard_id).is_none() {
return Err(ClusterError::ShardNotFound(shard_id.to_string()));
}
// Удаляем шард из всех узлов
for mut node in self.nodes.iter_mut() {
node.shards.retain(|id| id != shard_id);
}
Ok(())
}
/// Начало репликации
pub fn start_replication(&mut self, source_node: &str, target_node: &str) -> Result<(), ClusterError> {
// Проверяем существование узлов
if !self.nodes.contains_key(source_node) || !self.nodes.contains_key(target_node) {
return Err(ClusterError::NodeNotFound(format!("{} or {}", source_node, target_node)));
}
// Обновляем статус узлов
if let Some(mut source) = self.nodes.get_mut(source_node) {
source.status = NodeStatus::Syncing;
}
if let Some(mut target) = self.nodes.get_mut(target_node) {
target.status = NodeStatus::Syncing;
}
// В реальной реализации здесь будет логика репликации данных
// Для упрощения просто отправляем событие
let _ = self.event_sender.send(ClusterEvent::ReplicationStarted {
source: source_node.to_string(),
target: target_node.to_string(),
});
Ok(())
}
/// Получение статуса кластера
pub fn get_cluster_status(&self) -> ClusterStatus {
let nodes: Vec<ClusterNode> = self.nodes.iter().map(|entry| entry.value().clone()).collect();
let shards: Vec<Shard> = self.shards.iter().map(|entry| entry.value().clone()).collect();
ClusterStatus {
coordinator_id: self.get_coordinator_id(),
coordinator_address: self.get_coordinator_address().unwrap_or_default(),
is_coordinator: self.is_coordinator.load(Ordering::Relaxed),
cluster_version: self.cluster_version.load(Ordering::Relaxed),
node_count: nodes.len(),
shard_count: shards.len(),
nodes,
shards,
}
}
/// Запуск фоновых задач кластера
pub fn start_background_tasks(self: Arc<Self>) {
let cluster1 = Arc::clone(&self);
let cluster2 = Arc::clone(&self);
let cluster3 = Arc::clone(&self);
// Задача отправки heartbeat
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(5));
loop {
interval.tick().await;
// Отправляем heartbeat если мы координатор
if cluster1.is_coordinator.load(Ordering::Relaxed) {
cluster1.send_heartbeat().await;
}
}
});
// Задача обнаружения отказавших узлов
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(10));
loop {
interval.tick().await;
// Обнаружение отказавших узлов и перевыбор координатора если нужно
cluster2.detect_failed_nodes().await;
// Проверяем, жив ли координатор
if let Err(e) = cluster2.check_coordinator_health().await {
log::warn!("Coordinator health check failed: {}", e);
// Попытка перевыбора координатора
let cluster_mut = Arc::clone(&cluster2);
let mut cluster_ref = (*cluster_mut).clone();
if let Err(e) = cluster_ref.elect_new_coordinator() {
log::error!("Failed to elect new coordinator: {}", e);
}
}
}
});
// Задача проверки здоровья координатора
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(15));
loop {
interval.tick().await;
cluster3.coordinator_health_check().await;
}
});
}
/// Проверка здоровья координатора
async fn coordinator_health_check(&self) {
if !self.is_coordinator.load(Ordering::Relaxed) {
// Если мы не координатор, проверяем жив ли координатор
let coordinator = self.get_current_coordinator();
if let Some(coord) = coordinator {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let heartbeat_timeout = 30; // 30 секунд
if now - coord.last_heartbeat > heartbeat_timeout && coord.status == NodeStatus::Online {
log::warn!("Coordinator {} appears to be down (last heartbeat: {}s ago)",
coord.id, now - coord.last_heartbeat);
// Обновляем статус координатора
if let Some(mut node) = self.nodes.get_mut(&coord.id) {
node.status = NodeStatus::Offline;
}
// Отправляем событие
let _ = self.event_sender.send(ClusterEvent::CoordinatorFailed {
coordinator_id: coord.id.clone(),
});
}
} else {
log::warn!("No coordinator defined in cluster");
}
}
}
/// Проверка здоровья координатора (альтернативная реализация)
async fn check_coordinator_health(&self) -> Result<(), String> {
if self.is_coordinator.load(Ordering::Relaxed) {
// Мы координатор - всегда здоровы
return Ok(());
}
let coordinator = self.get_current_coordinator();
if let Some(coord) = coordinator {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let heartbeat_timeout = 30; // 30 секунд
if now - coord.last_heartbeat > heartbeat_timeout {
return Err(format!("Coordinator {} heartbeat timeout", coord.id));
}
Ok(())
} else {
Err("No coordinator defined".to_string())
}
}
/// Отправка heartbeat
async fn send_heartbeat(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
// Обновляем время последнего heartbeat для координатора
let coordinator_id = self.get_coordinator_id();
if let Some(mut node) = self.nodes.get_mut(&coordinator_id) {
node.last_heartbeat = now;
}
// Отправляем событие
let _ = self.event_sender.send(ClusterEvent::Heartbeat {
timestamp: now,
coordinator_id: coordinator_id,
});
}
/// Обнаружение отказавших узлов
async fn detect_failed_nodes(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let timeout = 30; // 30 секунд
for mut node in self.nodes.iter_mut() {
if node.id != self.get_coordinator_id() && now - node.last_heartbeat > timeout {
node.status = NodeStatus::Offline;
// Отправляем событие
let _ = self.event_sender.send(ClusterEvent::NodeFailed {
node_id: node.id.clone(),
});
}
}
}
}
/// Статус кластера
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterStatus {
pub coordinator_id: String,
pub coordinator_address: String,
pub is_coordinator: bool,
pub cluster_version: u64,
pub node_count: usize,
pub shard_count: usize,
pub nodes: Vec<ClusterNode>,
pub shards: Vec<Shard>,
}
/// События кластера
#[derive(Debug, Clone)]
pub enum ClusterEvent {
NodeJoined {
node_id: String,
address: String,
role: NodeRole,
},
NodeEvicted {
node_id: String,
},
NodeFailed {
node_id: String,
},
ReplicationStarted {
source: String,
target: String,
},
Heartbeat {
timestamp: u64,
coordinator_id: String,
},
ShardMoved {
shard_id: String,
from_node: String,
to_node: String,
},
CoordinatorChanged {
old_coordinator: String,
new_coordinator: String,
},
CoordinatorFailed {
coordinator_id: String,
},
}
/// Менеджер репликации
#[derive(Clone)]
pub struct ReplicationManager {
cluster_manager: Arc<ClusterManager>,
replication_queue: Arc<DashMap<String, Vec<ReplicationTask>>>,
is_replicating: Arc<AtomicBool>,
}
impl ReplicationManager {
pub fn new(cluster_manager: Arc<ClusterManager>) -> Self {
Self {
cluster_manager,
replication_queue: Arc::new(DashMap::new()),
is_replicating: Arc::new(AtomicBool::new(false)),
}
}
/// Начало репликации данных
pub async fn replicate_data(&self, database_name: &str, table_name: &str) -> Result<(), ClusterError> {
// В реальной реализации здесь будет логика репликации данных таблицы
// Для упрощения просто добавляем задачу в очередь
let task = ReplicationTask {
database: database_name.to_string(),
table: table_name.to_string(),
status: ReplicationStatus::Pending,
started_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
self.replication_queue
.entry(database_name.to_string())
.or_insert_with(Vec::new)
.push(task);
Ok(())
}
/// Запуск фоновой репликации
pub fn start_background_replication(self: Arc<Self>) {
let manager = Arc::clone(&self);
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(1));
loop {
interval.tick().await;
if !manager.is_replicating.load(Ordering::Relaxed) {
manager.process_replication_queue().await;
}
}
});
}
/// Обработка очереди репликации
async fn process_replication_queue(&self) {
self.is_replicating.store(true, Ordering::Relaxed);
// Обрабатываем задачи репликации
for mut entry in self.replication_queue.iter_mut() {
let tasks = entry.value_mut();
let mut completed_tasks = Vec::new();
for (i, task) in tasks.iter_mut().enumerate() {
if task.status == ReplicationStatus::Pending {
task.status = ReplicationStatus::InProgress;
// В реальной реализации здесь будет репликация данных
// Для упрощения просто отмечаем как завершенную
task.status = ReplicationStatus::Completed;
completed_tasks.push(i);
}
}
// Удаляем завершенные задачи
for i in completed_tasks.into_iter().rev() {
tasks.remove(i);
}
}
self.is_replicating.store(false, Ordering::Relaxed);
}
}
/// Задача репликации
#[derive(Debug, Clone)]
struct ReplicationTask {
database: String,
table: String,
status: ReplicationStatus,
started_at: u64,
}
/// Статус репликации
#[derive(Debug, Clone, PartialEq, Copy)]
enum ReplicationStatus {
Pending,
InProgress,
Completed,
Failed,
}
/// Ошибки кластера
#[derive(Debug, Error)]
pub enum ClusterError {
#[error("Node not found: {0}")]
NodeNotFound(String),
#[error("Shard not found: {0}")]
ShardNotFound(String),
#[error("Cannot evict coordinator")]
CannotEvictCoordinator,
#[error("Cannot evict last node")]
CannotEvictLastNode,
#[error("No nodes in cluster")]
NoNodes,
#[error("Replication error: {0}")]
ReplicationError(String),
#[error("Network error: {0}")]
NetworkError(String),
#[error("Coordinator election failed: {0}")]
CoordinatorElectionFailed(String),
}