added raft-leader checks

This commit is contained in:
Григорий Сафронов 2026-01-20 21:44:25 +00:00
parent 525e1ebcf5
commit e99344185c

View File

@ -32,32 +32,36 @@ use crate::common::protocol;
/// Состояния узла в Raft протоколе /// Состояния узла в Raft протоколе
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum RaftState { pub enum RaftState {
Follower, Follower, // Подчиненный узел, получает команды от лидера
Candidate, Candidate, // Кандидат на выборах лидера
Leader, Leader, // Лидер, координирует операции репликации
} }
/// Atomic Raft состояние для атомарных операций /// Atomic Raft состояние для атомарных операций
/// Обеспечивает thread-safe доступ к состоянию Raft узла
struct AtomicRaftState { struct AtomicRaftState {
inner: AtomicU64, inner: AtomicU64, // Внутреннее атомарное представление состояния
} }
impl AtomicRaftState { impl AtomicRaftState {
/// Создает новое атомарное состояние Raft
fn new() -> Self { fn new() -> Self {
Self { Self {
inner: AtomicU64::new(0), inner: AtomicU64::new(0), // Начальное состояние: Follower
} }
} }
/// Получает текущее состояние Raft
fn get(&self) -> RaftState { fn get(&self) -> RaftState {
match self.inner.load(Ordering::Acquire) { match self.inner.load(Ordering::Acquire) {
0 => RaftState::Follower, 0 => RaftState::Follower,
1 => RaftState::Candidate, 1 => RaftState::Candidate,
2 => RaftState::Leader, 2 => RaftState::Leader,
_ => RaftState::Follower, _ => RaftState::Follower, // Значение по умолчанию при ошибке
} }
} }
/// Устанавливает новое состояние Raft
fn set(&self, state: RaftState) { fn set(&self, state: RaftState) {
let value = match state { let value = match state {
RaftState::Follower => 0, RaftState::Follower => 0,
@ -67,6 +71,8 @@ impl AtomicRaftState {
self.inner.store(value, Ordering::Release); self.inner.store(value, Ordering::Release);
} }
/// Атомарное сравнение и обмен состояния Raft
/// Возвращает Ok(new) если операция успешна, иначе Err(actual_state)
fn compare_exchange(&self, current: RaftState, new: RaftState, order: Ordering) -> std::result::Result<RaftState, RaftState> { fn compare_exchange(&self, current: RaftState, new: RaftState, order: Ordering) -> std::result::Result<RaftState, RaftState> {
let current_val = match current { let current_val = match current {
RaftState::Follower => 0, RaftState::Follower => 0,
@ -98,52 +104,53 @@ impl AtomicRaftState {
/// Информация о Raft узле /// Информация о Raft узле
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RaftNode { pub struct RaftNode {
pub node_id: String, pub node_id: String, // Уникальный идентификатор узла
pub address: String, pub address: String, // Сетевой адрес узла
pub state: RaftState, pub state: RaftState, // Текущее состояние узла
pub term: u64, pub term: u64, // Текущий термин Raft
pub voted_for: Option<String>, pub voted_for: Option<String>, // За кого голосовал узел в текущем терме
pub last_heartbeat: i64, pub last_heartbeat: i64, // Время последнего heartbeat (timestamp)
} }
/// Информация о шард-узле с Raft /// Информация о шард-узле с Raft
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShardNode { pub struct ShardNode {
pub node_id: String, pub node_id: String, // Уникальный идентификатор узла
pub address: String, pub address: String, // Сетевой адрес узла
pub capacity: u64, pub capacity: u64, // Общая емкость узла в байтах
pub used: u64, pub used: u64, // Использованная емкость в байтах
pub collections: Vec<String>, pub collections: Vec<String>, // Коллекции, хранящиеся на узле
pub raft_info: RaftNode, pub raft_info: RaftNode, // Информация о состоянии Raft узла
} }
/// Состояние шардинга для коллекции /// Состояние шардинга для коллекции
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct CollectionSharding { pub struct CollectionSharding {
pub shard_key: String, pub shard_key: String, // Ключ для шардинга
pub virtual_nodes: usize, pub virtual_nodes: usize, // Количество виртуальных узлов на физический узел
// Используем Arc<DashMap> для совместного доступа из нескольких потоков pub ring: Arc<DashMap<u64, String>>, // Консистентное хэш-кольцо для распределения данных
pub ring: Arc<DashMap<u64, String>>,
} }
/// События репликации /// События репликации для обработки в очереди
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum ReplicationEvent { pub enum ReplicationEvent {
Command(protocol::Command), Command(protocol::Command), // Команда для репликации
SyncRequest, SyncRequest, // Запрос синхронизации
Heartbeat, Heartbeat, // Heartbeat сообщение
RaftVoteRequest { term: u64, candidate_id: String }, RaftVoteRequest { term: u64, candidate_id: String }, // Запрос голоса в Raft
RaftVoteResponse { term: u64, vote_granted: bool }, RaftVoteResponse { term: u64, vote_granted: bool }, // Ответ на запрос голоса
RaftAppendEntries { term: u64, leader_id: String }, RaftAppendEntries { term: u64, leader_id: String }, // Сообщение о добавлении записей от лидера
} }
/// Lock-Free очередь репликации на основе SegQueue /// Lock-Free очередь репликации на основе SegQueue
/// Обеспечивает безопасный доступ из нескольких потоков без блокировок
struct LockFreeReplicationQueue { struct LockFreeReplicationQueue {
queue: SegQueue<ReplicationEvent>, queue: SegQueue<ReplicationEvent>, // Сегментированная lock-free очередь
size: AtomicUsize, size: AtomicUsize, // Атомарный счетчик размера очереди
} }
impl LockFreeReplicationQueue { impl LockFreeReplicationQueue {
/// Создает новую lock-free очередь
fn new() -> Self { fn new() -> Self {
Self { Self {
queue: SegQueue::new(), queue: SegQueue::new(),
@ -151,11 +158,13 @@ impl LockFreeReplicationQueue {
} }
} }
/// Добавляет событие в очередь
fn push(&self, event: ReplicationEvent) { fn push(&self, event: ReplicationEvent) {
self.queue.push(event); self.queue.push(event);
self.size.fetch_add(1, Ordering::SeqCst); self.size.fetch_add(1, Ordering::SeqCst);
} }
/// Извлекает событие из очереди
fn pop(&self) -> Option<ReplicationEvent> { fn pop(&self) -> Option<ReplicationEvent> {
let event = self.queue.pop(); let event = self.queue.pop();
if event.is_some() { if event.is_some() {
@ -164,36 +173,38 @@ impl LockFreeReplicationQueue {
event event
} }
/// Возвращает текущий размер очереди
fn len(&self) -> usize { fn len(&self) -> usize {
self.size.load(Ordering::Acquire) self.size.load(Ordering::Acquire)
} }
} }
/// Менеджер шардинга и репликации с Raft /// Менеджер шардинга и репликации с Raft
/// Координирует распределение данных по узлам кластера и обеспечивает консенсус
#[derive(Clone)] #[derive(Clone)]
pub struct ShardingManager { pub struct ShardingManager {
// Шардинг компоненты // Шардинг компоненты
nodes: Arc<DashMap<String, ShardNode>>, nodes: Arc<DashMap<String, ShardNode>>, // Узлы кластера
// Используем DashMap для атомарного доступа к настройкам шардинга коллекций collections: Arc<DashMap<String, CollectionSharding>>, // Настройки шардинга для коллекций
collections: Arc<DashMap<String, CollectionSharding>>, virtual_nodes_per_node: usize, // Виртуальных узлов на физический узел
virtual_nodes_per_node: usize, min_nodes_for_cluster: usize, // Минимальное количество узлов для формирования кластера
min_nodes_for_cluster: usize,
// Raft компоненты с atomic операциями // Raft компоненты с atomic операциями
current_term: Arc<AtomicU64>, current_term: Arc<AtomicU64>, // Текущий термин Raft
voted_for: Arc<DashMap<u64, String>>, voted_for: Arc<DashMap<u64, String>>, // Голоса по терминам
is_leader: Arc<AtomicBool>, is_leader: Arc<AtomicBool>, // Флаг лидера
raft_state: Arc<AtomicRaftState>, raft_state: Arc<AtomicRaftState>, // Текущее состояние Raft
cluster_formed: Arc<AtomicBool>, cluster_formed: Arc<AtomicBool>, // Флаг сформированного кластера
// Репликация компоненты // Репликация компоненты
replication_queue: Arc<LockFreeReplicationQueue>, replication_queue: Arc<LockFreeReplicationQueue>, // Очередь репликации
sequence_number: Arc<AtomicU64>, sequence_number: Arc<AtomicU64>, // Номер последовательности для упорядочивания
replication_enabled: Arc<AtomicBool>, replication_enabled: Arc<AtomicBool>, // Флаг включенной репликации
node_id: String, node_id: String, // Идентификатор текущего узла
} }
impl ShardingManager { impl ShardingManager {
/// Создает новый менеджер шардинга
pub fn new( pub fn new(
virtual_nodes_per_node: usize, virtual_nodes_per_node: usize,
replication_enabled: bool, replication_enabled: bool,
@ -233,6 +244,8 @@ impl ShardingManager {
manager manager
} }
/// Основной цикл репликации и выборов Raft
/// Обрабатывает heartbeat, таймауты выборов и события из очереди
async fn run_replication_loop(self) { async fn run_replication_loop(self) {
let mut heartbeat_interval = interval(Duration::from_millis(1000)); let mut heartbeat_interval = interval(Duration::from_millis(1000));
let mut election_timeout = interval(Duration::from_millis(5000)); let mut election_timeout = interval(Duration::from_millis(5000));
@ -240,6 +253,7 @@ impl ShardingManager {
loop { loop {
tokio::select! { tokio::select! {
_ = heartbeat_interval.tick() => { _ = heartbeat_interval.tick() => {
// Отправка heartbeat если текущий узел - лидер
if self.is_leader.load(Ordering::SeqCst) && if self.is_leader.load(Ordering::SeqCst) &&
self.replication_enabled.load(Ordering::SeqCst) && self.replication_enabled.load(Ordering::SeqCst) &&
self.cluster_formed.load(Ordering::SeqCst) { self.cluster_formed.load(Ordering::SeqCst) {
@ -247,6 +261,7 @@ impl ShardingManager {
} }
} }
_ = election_timeout.tick() => { _ = election_timeout.tick() => {
// Запуск выборов если текущий узел не лидер
if !self.is_leader.load(Ordering::SeqCst) && if !self.is_leader.load(Ordering::SeqCst) &&
self.replication_enabled.load(Ordering::SeqCst) && self.replication_enabled.load(Ordering::SeqCst) &&
self.cluster_formed.load(Ordering::SeqCst) { self.cluster_formed.load(Ordering::SeqCst) {
@ -254,6 +269,7 @@ impl ShardingManager {
} }
} }
_ = tokio::time::sleep(Duration::from_millis(10)) => { _ = tokio::time::sleep(Duration::from_millis(10)) => {
// Обработка событий из очереди репликации
while let Some(event) = self.replication_queue.pop() { while let Some(event) = self.replication_queue.pop() {
self.handle_replication_event(event).await; self.handle_replication_event(event).await;
} }
@ -262,6 +278,7 @@ impl ShardingManager {
} }
} }
/// Обработка события репликации
async fn handle_replication_event(&self, event: ReplicationEvent) { async fn handle_replication_event(&self, event: ReplicationEvent) {
if !self.replication_enabled.load(Ordering::SeqCst) { if !self.replication_enabled.load(Ordering::SeqCst) {
return; return;
@ -289,17 +306,21 @@ impl ShardingManager {
} }
} }
/// Репликация команды на все узлы кластера
async fn replicate_command(&self, command: protocol::Command) { async fn replicate_command(&self, command: protocol::Command) {
if !self.cluster_formed.load(Ordering::SeqCst) { if !self.cluster_formed.load(Ordering::SeqCst) {
return; return;
} }
// Генерируем уникальный номер последовательности
let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst); let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst);
// Получаем список всех узлов
let nodes: Vec<ShardNode> = self.nodes.iter() let nodes: Vec<ShardNode> = self.nodes.iter()
.map(|entry| entry.value().clone()) .map(|entry| entry.value().clone())
.collect(); .collect();
// Отправляем команду на все узлы кроме себя (если лидер)
for node in nodes { for node in nodes {
if self.is_leader.load(Ordering::SeqCst) && node.raft_info.node_id == self.node_id { if self.is_leader.load(Ordering::SeqCst) && node.raft_info.node_id == self.node_id {
continue; continue;
@ -309,6 +330,7 @@ impl ShardingManager {
let cmd_clone = command.clone(); let cmd_clone = command.clone();
let seq_clone = sequence; let seq_clone = sequence;
// Асинхронная отправка на каждый узел
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = Self::send_command_to_node(&node_addr, &cmd_clone, seq_clone).await { if let Err(e) = Self::send_command_to_node(&node_addr, &cmd_clone, seq_clone).await {
eprintln!("Failed to replicate to {}: {}", node_addr, e); eprintln!("Failed to replicate to {}: {}", node_addr, e);
@ -317,9 +339,11 @@ impl ShardingManager {
} }
} }
/// Отправка команды на конкретный узел
async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> { async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> {
use crate::common::protocol::{ReplicationMessage, serialize}; use crate::common::protocol::{ReplicationMessage, serialize};
// Устанавливаем соединение с узлом
let mut stream = match tokio::net::TcpStream::connect(node).await { let mut stream = match tokio::net::TcpStream::connect(node).await {
Ok(stream) => stream, Ok(stream) => stream,
Err(e) => { Err(e) => {
@ -328,12 +352,14 @@ impl ShardingManager {
} }
}; };
// Создаем сообщение репликации
let message = ReplicationMessage { let message = ReplicationMessage {
sequence, sequence,
command: command.clone(), command: command.clone(),
timestamp: chrono::Utc::now().timestamp(), timestamp: chrono::Utc::now().timestamp(),
}; };
// Сериализуем сообщение
let bytes = match serialize(&message) { let bytes = match serialize(&message) {
Ok(b) => b, Ok(b) => b,
Err(e) => { Err(e) => {
@ -342,6 +368,7 @@ impl ShardingManager {
} }
}; };
// Отправляем данные
if let Err(e) = stream.write_all(&bytes).await { if let Err(e) = stream.write_all(&bytes).await {
eprintln!("Failed to send command to {}: {}", node, e); eprintln!("Failed to send command to {}: {}", node, e);
} }
@ -349,6 +376,7 @@ impl ShardingManager {
Ok(()) Ok(())
} }
/// Синхронизация со всеми узлами кластера
async fn sync_with_nodes(&self) { async fn sync_with_nodes(&self) {
if !self.cluster_formed.load(Ordering::SeqCst) { if !self.cluster_formed.load(Ordering::SeqCst) {
return; return;
@ -370,11 +398,14 @@ impl ShardingManager {
} }
} }
/// Синхронизация с конкретным узлом
async fn sync_with_node(_node: &str) -> Result<()> { async fn sync_with_node(_node: &str) -> Result<()> {
// Заглушка для синхронизации // Заглушка для синхронизации
// В реальной реализации здесь будет обмен метаданными и данными
Ok(()) Ok(())
} }
/// Отправка heartbeat сообщений на все подчиненные узлы
async fn send_heartbeat(&self) -> Result<()> { async fn send_heartbeat(&self) -> Result<()> {
if !self.cluster_formed.load(Ordering::SeqCst) { if !self.cluster_formed.load(Ordering::SeqCst) {
return Ok(()); return Ok(());
@ -384,6 +415,7 @@ impl ShardingManager {
.map(|entry| entry.value().clone()) .map(|entry| entry.value().clone())
.collect(); .collect();
// Отправляем heartbeat всем подчиненным узлам
for node in nodes { for node in nodes {
if node.raft_info.state == RaftState::Follower && node.raft_info.node_id != self.node_id { if node.raft_info.state == RaftState::Follower && node.raft_info.node_id != self.node_id {
let node_addr = node.address.clone(); let node_addr = node.address.clone();
@ -397,6 +429,7 @@ impl ShardingManager {
Ok(()) Ok(())
} }
/// Отправка heartbeat на конкретный узел
async fn send_heartbeat_to_node(node: &str) -> Result<()> { async fn send_heartbeat_to_node(node: &str) -> Result<()> {
use crate::common::protocol::{ReplicationMessage, serialize}; use crate::common::protocol::{ReplicationMessage, serialize};
@ -408,6 +441,7 @@ impl ShardingManager {
} }
}; };
// Создаем heartbeat сообщение
let heartbeat = ReplicationMessage { let heartbeat = ReplicationMessage {
sequence: 0, sequence: 0,
command: protocol::Command::CallProcedure { name: "heartbeat".to_string() }, command: protocol::Command::CallProcedure { name: "heartbeat".to_string() },
@ -429,22 +463,27 @@ impl ShardingManager {
Ok(()) Ok(())
} }
/// Обработка запроса голоса в Raft
async fn handle_vote_request(&self, term: u64, candidate_id: String) { async fn handle_vote_request(&self, term: u64, candidate_id: String) {
let current_term = self.current_term.load(Ordering::SeqCst); let current_term = self.current_term.load(Ordering::SeqCst);
if term > current_term { if term > current_term {
// Обновляем термин и голосуем за кандидата
self.current_term.store(term, Ordering::SeqCst); self.current_term.store(term, Ordering::SeqCst);
self.voted_for.insert(term, candidate_id); self.voted_for.insert(term, candidate_id);
} }
} }
/// Обработка ответа на запрос голоса в Raft
async fn handle_vote_response(&self, term: u64, vote_granted: bool) { async fn handle_vote_response(&self, term: u64, vote_granted: bool) {
if vote_granted && term == self.current_term.load(Ordering::SeqCst) { if vote_granted && term == self.current_term.load(Ordering::SeqCst) {
let node_count = self.nodes.len(); let node_count = self.nodes.len();
// Проверяем, получено ли большинство голосов
if node_count >= self.min_nodes_for_cluster { if node_count >= self.min_nodes_for_cluster {
match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Leader, Ordering::SeqCst) { match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Leader, Ordering::SeqCst) {
Ok(_) => { Ok(_) => {
// Становимся лидером
self.is_leader.store(true, Ordering::SeqCst); self.is_leader.store(true, Ordering::SeqCst);
println!("Elected as leader for term {}", term); println!("Elected as leader for term {}", term);
} }
@ -454,16 +493,19 @@ impl ShardingManager {
} }
} }
/// Обработка сообщения AppendEntries от лидера
async fn handle_append_entries(&self, term: u64, leader_id: String) { async fn handle_append_entries(&self, term: u64, leader_id: String) {
let current_term = self.current_term.load(Ordering::SeqCst); let current_term = self.current_term.load(Ordering::SeqCst);
if term >= current_term { if term >= current_term {
// Принимаем лидера и переходим в состояние follower
self.current_term.store(term, Ordering::SeqCst); self.current_term.store(term, Ordering::SeqCst);
match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Follower, Ordering::SeqCst) { match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Follower, Ordering::SeqCst) {
Ok(_) => { Ok(_) => {
self.is_leader.store(false, Ordering::SeqCst); self.is_leader.store(false, Ordering::SeqCst);
// Обновляем информацию о состоянии узла
if let Some(mut node) = self.nodes.get_mut(&self.node_id) { if let Some(mut node) = self.nodes.get_mut(&self.node_id) {
node.raft_info.state = RaftState::Follower; node.raft_info.state = RaftState::Follower;
node.raft_info.term = term; node.raft_info.term = term;
@ -475,7 +517,9 @@ impl ShardingManager {
} }
} }
/// Добавление узла в кластер
pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> { pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> {
// Создаем информацию о Raft узле
let raft_node = RaftNode { let raft_node = RaftNode {
node_id: node_id.clone(), node_id: node_id.clone(),
address: address.clone(), address: address.clone(),
@ -485,6 +529,7 @@ impl ShardingManager {
last_heartbeat: chrono::Utc::now().timestamp(), last_heartbeat: chrono::Utc::now().timestamp(),
}; };
// Создаем информацию о шард-узле
let node = ShardNode { let node = ShardNode {
node_id: node_id.clone(), node_id: node_id.clone(),
address, address,
@ -494,8 +539,10 @@ impl ShardingManager {
raft_info: raft_node, raft_info: raft_node,
}; };
// Добавляем узел в кластер
self.nodes.insert(node_id, node); self.nodes.insert(node_id, node);
// Проверяем, сформирован ли кластер
let node_count = self.nodes.len(); let node_count = self.nodes.len();
if node_count >= self.min_nodes_for_cluster { if node_count >= self.min_nodes_for_cluster {
self.cluster_formed.store(true, Ordering::SeqCst); self.cluster_formed.store(true, Ordering::SeqCst);
@ -506,9 +553,11 @@ impl ShardingManager {
Ok(()) Ok(())
} }
/// Удаление узла из кластера
pub fn remove_node(&self, node_id: &str) -> Result<()> { pub fn remove_node(&self, node_id: &str) -> Result<()> {
self.nodes.remove(node_id); self.nodes.remove(node_id);
// Проверяем, остался ли кластер сформированным
let node_count = self.nodes.len(); let node_count = self.nodes.len();
if node_count < self.min_nodes_for_cluster { if node_count < self.min_nodes_for_cluster {
self.cluster_formed.store(false, Ordering::SeqCst); self.cluster_formed.store(false, Ordering::SeqCst);
@ -520,6 +569,7 @@ impl ShardingManager {
Ok(()) Ok(())
} }
/// Настройка шардинга для коллекции
pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> { pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> {
if !self.cluster_formed.load(Ordering::SeqCst) { if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::ShardingError( return Err(crate::common::FutriixError::ShardingError(
@ -528,6 +578,7 @@ impl ShardingManager {
)); ));
} }
// Создаем настройки шардинга для коллекции
let sharding = CollectionSharding { let sharding = CollectionSharding {
shard_key: shard_key.to_string(), shard_key: shard_key.to_string(),
virtual_nodes: self.virtual_nodes_per_node, virtual_nodes: self.virtual_nodes_per_node,
@ -536,21 +587,25 @@ impl ShardingManager {
self.collections.insert(collection.to_string(), sharding); self.collections.insert(collection.to_string(), sharding);
// Перестраиваем хэш-кольцо для коллекции
self.rebuild_ring(collection)?; self.rebuild_ring(collection)?;
Ok(()) Ok(())
} }
/// Перестроение хэш-кольца для коллекции
fn rebuild_ring(&self, collection: &str) -> Result<()> { fn rebuild_ring(&self, collection: &str) -> Result<()> {
if let Some(mut entry) = self.collections.get_mut(collection) { if let Some(mut entry) = self.collections.get_mut(collection) {
let sharding = entry.value_mut(); let sharding = entry.value_mut();
// Очищаем ring // Очищаем существующее кольцо
sharding.ring.clear(); sharding.ring.clear();
// Получаем список всех узлов
let nodes: Vec<String> = self.nodes.iter() let nodes: Vec<String> = self.nodes.iter()
.map(|node_entry| node_entry.key().clone()) .map(|node_entry| node_entry.key().clone())
.collect(); .collect();
// Добавляем виртуальные узлы для каждого физического узла
for node_id in nodes { for node_id in nodes {
for i in 0..sharding.virtual_nodes { for i in 0..sharding.virtual_nodes {
let key = format!("{}-{}", node_id, i); let key = format!("{}-{}", node_id, i);
@ -563,12 +618,14 @@ impl ShardingManager {
Ok(()) Ok(())
} }
/// Хэширование ключа для консистентного хэширования
fn hash_key(&self, key: &str) -> u64 { fn hash_key(&self, key: &str) -> u64 {
let mut hasher = SipHasher13::new(); let mut hasher = SipHasher13::new();
key.hash(&mut hasher); key.hash(&mut hasher);
hasher.finish() hasher.finish()
} }
/// Поиск узла для ключа в указанной коллекции
pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result<Option<String>> { pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result<Option<String>> {
if !self.cluster_formed.load(Ordering::SeqCst) { if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::ShardingError( return Err(crate::common::FutriixError::ShardingError(
@ -578,6 +635,7 @@ impl ShardingManager {
} }
if let Some(sharding) = self.collections.get(collection) { if let Some(sharding) = self.collections.get(collection) {
// Вычисляем хэш ключа
let key_hash = self.hash_key(key_value); let key_hash = self.hash_key(key_value);
// Собираем все записи в вектор // Собираем все записи в вектор
@ -595,7 +653,7 @@ impl ShardingManager {
} }
} }
// Если не нашли, возвращаем первый узел // Если не нашли, возвращаем первый узел (циклический переход)
if let Some((_, node_id)) = entries.first() { if let Some((_, node_id)) = entries.first() {
return Ok(Some(node_id.clone())); return Ok(Some(node_id.clone()));
} }
@ -604,6 +662,7 @@ impl ShardingManager {
Ok(None) Ok(None)
} }
/// Миграция шарда с одного узла на другой
pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> { pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> {
if !self.cluster_formed.load(Ordering::SeqCst) { if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::ShardingError( return Err(crate::common::FutriixError::ShardingError(
@ -612,12 +671,14 @@ impl ShardingManager {
)); ));
} }
// Проверяем существование исходного узла
if !self.nodes.contains_key(from_node) { if !self.nodes.contains_key(from_node) {
return Err(crate::common::FutriixError::ShardingError( return Err(crate::common::FutriixError::ShardingError(
format!("Source node '{}' not found in cluster", from_node) format!("Source node '{}' not found in cluster", from_node)
)); ));
} }
// Проверяем существование целевого узла
if !self.nodes.contains_key(to_node) { if !self.nodes.contains_key(to_node) {
return Err(crate::common::FutriixError::ShardingError( return Err(crate::common::FutriixError::ShardingError(
format!("Destination node '{}' not found in cluster", to_node) format!("Destination node '{}' not found in cluster", to_node)
@ -627,10 +688,12 @@ impl ShardingManager {
println!("Migrating shard for collection '{}' from {} to {} with key {}", println!("Migrating shard for collection '{}' from {} to {} with key {}",
collection, from_node, to_node, shard_key); collection, from_node, to_node, shard_key);
// Перестраиваем кольцо для отражения миграции
self.rebuild_ring(collection)?; self.rebuild_ring(collection)?;
Ok(()) Ok(())
} }
/// Ребалансировка кластера
pub fn rebalance_cluster(&self) -> Result<()> { pub fn rebalance_cluster(&self) -> Result<()> {
if !self.cluster_formed.load(Ordering::SeqCst) { if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::ShardingError( return Err(crate::common::FutriixError::ShardingError(
@ -642,16 +705,18 @@ impl ShardingManager {
let node_count = self.nodes.len(); let node_count = self.nodes.len();
println!("Rebalancing cluster with {} nodes", node_count); println!("Rebalancing cluster with {} nodes", node_count);
// Перестраиваем все rings // Перестраиваем все хэш-кольца
for key in self.collections.iter().map(|entry| entry.key().clone()).collect::<Vec<_>>() { for key in self.collections.iter().map(|entry| entry.key().clone()).collect::<Vec<_>>() {
self.rebuild_ring(&key)?; self.rebuild_ring(&key)?;
} }
// Ребалансируем узлы
self.rebalance_nodes()?; self.rebalance_nodes()?;
Ok(()) Ok(())
} }
/// Ребалансировка нагрузки между узлами
fn rebalance_nodes(&self) -> Result<()> { fn rebalance_nodes(&self) -> Result<()> {
println!("Rebalancing nodes in cluster..."); println!("Rebalancing nodes in cluster...");
@ -659,25 +724,31 @@ impl ShardingManager {
let mut total_used = 0; let mut total_used = 0;
let mut nodes_info = Vec::new(); let mut nodes_info = Vec::new();
// Собираем статистику по всем узлам
for node in self.nodes.iter() { for node in self.nodes.iter() {
total_capacity += node.capacity; total_capacity += node.capacity;
total_used += node.used; total_used += node.used;
nodes_info.push((node.node_id.clone(), node.used, node.capacity)); nodes_info.push((node.node_id.clone(), node.used, node.capacity));
} }
// Вычисляем среднюю загрузку
let avg_usage = if total_capacity > 0 { total_used as f64 / total_capacity as f64 } else { 0.0 }; let avg_usage = if total_capacity > 0 { total_used as f64 / total_capacity as f64 } else { 0.0 };
println!("Cluster usage: {:.2}% ({} / {})", avg_usage * 100.0, total_used, total_capacity); println!("Cluster usage: {:.2}% ({} / {})", avg_usage * 100.0, total_used, total_capacity);
// Определяем перегруженные и недогруженные узлы
let mut overloaded_nodes = Vec::new(); let mut overloaded_nodes = Vec::new();
let mut underloaded_nodes = Vec::new(); let mut underloaded_nodes = Vec::new();
for (node_id, used, capacity) in nodes_info { for (node_id, used, capacity) in nodes_info {
let usage = if capacity > 0 { used as f64 / capacity as f64 } else { 0.0 }; let usage = if capacity > 0 { used as f64 / capacity as f64 } else { 0.0 };
// Узлы с загрузкой > 120% от средней считаем перегруженными
if usage > avg_usage * 1.2 { if usage > avg_usage * 1.2 {
overloaded_nodes.push((node_id, usage)); overloaded_nodes.push((node_id, usage));
} else if usage < avg_usage * 0.8 { }
// Узлы с загрузкой < 80% от средней считаем недогруженными
else if usage < avg_usage * 0.8 {
underloaded_nodes.push((node_id, usage)); underloaded_nodes.push((node_id, usage));
} }
} }
@ -685,15 +756,20 @@ impl ShardingManager {
println!("Overloaded nodes: {}", overloaded_nodes.len()); println!("Overloaded nodes: {}", overloaded_nodes.len());
println!("Underloaded nodes: {}", underloaded_nodes.len()); println!("Underloaded nodes: {}", underloaded_nodes.len());
// В реальной реализации здесь была бы логика миграции данных
// между перегруженными и недогруженными узлами
Ok(()) Ok(())
} }
/// Получение статуса кластера
pub fn get_cluster_status(&self) -> Result<protocol::ClusterStatus> { pub fn get_cluster_status(&self) -> Result<protocol::ClusterStatus> {
let mut cluster_nodes = Vec::new(); let mut cluster_nodes = Vec::new();
let mut total_capacity = 0; let mut total_capacity = 0;
let mut total_used = 0; let mut total_used = 0;
let mut raft_nodes = Vec::new(); let mut raft_nodes = Vec::new();
// Собираем информацию о всех узлах
for node in self.nodes.iter() { for node in self.nodes.iter() {
total_capacity += node.capacity; total_capacity += node.capacity;
total_used += node.used; total_used += node.used;
@ -719,6 +795,7 @@ impl ShardingManager {
}); });
} }
// Определяем, нужна ли ребалансировка
let rebalance_needed = { let rebalance_needed = {
if total_capacity == 0 { if total_capacity == 0 {
false false
@ -733,6 +810,8 @@ impl ShardingManager {
0.0 0.0
}; };
// Помечаем кластер как нуждающийся в ребалансировке,
// если есть узлы с загрузкой > 120% или < 80% от средней
if usage > avg_usage * 1.2 || usage < avg_usage * 0.8 { if usage > avg_usage * 1.2 || usage < avg_usage * 0.8 {
needs_rebalance = true; needs_rebalance = true;
break; break;
@ -743,6 +822,7 @@ impl ShardingManager {
} }
}; };
// Формируем статус кластера
Ok(protocol::ClusterStatus { Ok(protocol::ClusterStatus {
nodes: cluster_nodes, nodes: cluster_nodes,
total_capacity, total_capacity,
@ -754,16 +834,19 @@ impl ShardingManager {
}) })
} }
/// Получение списка Raft узлов
pub fn get_raft_nodes(&self) -> Vec<RaftNode> { pub fn get_raft_nodes(&self) -> Vec<RaftNode> {
self.nodes.iter() self.nodes.iter()
.map(|node| node.raft_info.clone()) .map(|node| node.raft_info.clone())
.collect() .collect()
} }
/// Проверка, сформирован ли кластер
pub fn is_cluster_formed(&self) -> bool { pub fn is_cluster_formed(&self) -> bool {
self.cluster_formed.load(Ordering::SeqCst) self.cluster_formed.load(Ordering::SeqCst)
} }
/// Запуск выборов лидера
pub fn start_election(&self) -> Result<()> { pub fn start_election(&self) -> Result<()> {
if !self.cluster_formed.load(Ordering::SeqCst) { if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::ShardingError( return Err(crate::common::FutriixError::ShardingError(
@ -771,19 +854,23 @@ impl ShardingManager {
)); ));
} }
// Увеличиваем номер термина
let new_term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1; let new_term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1;
println!("Starting election for term {}", new_term); println!("Starting election for term {}", new_term);
self.is_leader.store(false, Ordering::SeqCst); self.is_leader.store(false, Ordering::SeqCst);
// Переходим в состояние кандидата
match self.raft_state.compare_exchange(RaftState::Follower, RaftState::Candidate, Ordering::SeqCst) { match self.raft_state.compare_exchange(RaftState::Follower, RaftState::Candidate, Ordering::SeqCst) {
Ok(_) => { Ok(_) => {
// Обновляем информацию о текущем узле
if let Some(mut node) = self.nodes.get_mut(&self.node_id) { if let Some(mut node) = self.nodes.get_mut(&self.node_id) {
node.raft_info.state = RaftState::Candidate; node.raft_info.state = RaftState::Candidate;
node.raft_info.term = new_term; node.raft_info.term = new_term;
node.raft_info.voted_for = Some(self.node_id.clone()); node.raft_info.voted_for = Some(self.node_id.clone());
} }
// Отправляем запросы на голосование
self.replication_queue.push(ReplicationEvent::RaftVoteRequest { self.replication_queue.push(ReplicationEvent::RaftVoteRequest {
term: new_term, term: new_term,
candidate_id: self.node_id.clone(), candidate_id: self.node_id.clone(),
@ -797,6 +884,7 @@ impl ShardingManager {
Ok(()) Ok(())
} }
/// Репликация команды
pub async fn replicate(&self, command: protocol::Command) -> Result<()> { pub async fn replicate(&self, command: protocol::Command) -> Result<()> {
if !self.replication_enabled.load(Ordering::SeqCst) { if !self.replication_enabled.load(Ordering::SeqCst) {
return Ok(()); return Ok(());
@ -808,10 +896,12 @@ impl ShardingManager {
)); ));
} }
// Добавляем команду в очередь репликации
self.replication_queue.push(ReplicationEvent::Command(command)); self.replication_queue.push(ReplicationEvent::Command(command));
Ok(()) Ok(())
} }
/// Запрос синхронизации
pub async fn request_sync(&self) -> Result<()> { pub async fn request_sync(&self) -> Result<()> {
if !self.replication_enabled.load(Ordering::SeqCst) { if !self.replication_enabled.load(Ordering::SeqCst) {
return Ok(()); return Ok(());
@ -821,24 +911,29 @@ impl ShardingManager {
Ok(()) Ok(())
} }
/// Получение списка всех узлов
pub fn get_nodes(&self) -> Vec<ShardNode> { pub fn get_nodes(&self) -> Vec<ShardNode> {
self.nodes.iter() self.nodes.iter()
.map(|node| node.clone()) .map(|node| node.clone())
.collect() .collect()
} }
/// Получение текущего номера последовательности
pub fn get_sequence_number(&self) -> u64 { pub fn get_sequence_number(&self) -> u64 {
self.sequence_number.load(Ordering::SeqCst) self.sequence_number.load(Ordering::SeqCst)
} }
/// Проверка, включена ли репликация
pub fn is_replication_enabled(&self) -> bool { pub fn is_replication_enabled(&self) -> bool {
self.replication_enabled.load(Ordering::SeqCst) self.replication_enabled.load(Ordering::SeqCst)
} }
/// Получение информации об узле по ID
pub fn get_node(&self, node_id: &str) -> Option<ShardNode> { pub fn get_node(&self, node_id: &str) -> Option<ShardNode> {
self.nodes.get(node_id).map(|entry| entry.clone()) self.nodes.get(node_id).map(|entry| entry.clone())
} }
/// Получение ID текущего узла
pub fn get_node_id(&self) -> &str { pub fn get_node_id(&self) -> &str {
&self.node_id &self.node_id
} }