Upload files to "src/server"
This commit is contained in:
parent
304fb24386
commit
f58d6969ff
978
src/server/sharding.rs
Normal file
978
src/server/sharding.rs
Normal file
@ -0,0 +1,978 @@
|
|||||||
|
// src/server/sharding.rs
|
||||||
|
//! Модуль шардинга с консистентным хэшированием и Raft протоколом
|
||||||
|
//!
|
||||||
|
//! Объединяет функционал шардинга и репликации с lock-free архитектурой
|
||||||
|
//! и реализацией Raft консенсуса для работы в production.
|
||||||
|
|
||||||
|
use std::collections::{HashMap, BTreeMap};
|
||||||
|
use std::hash::{Hash, Hasher};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio::time::{interval, Duration};
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
use siphasher::sip::SipHasher13;
|
||||||
|
use crossbeam::queue::SegQueue;
|
||||||
|
use crossbeam::epoch::{self, Atomic, Owned, Guard};
|
||||||
|
|
||||||
|
use crate::common::Result;
|
||||||
|
use crate::common::protocol;
|
||||||
|
|
||||||
|
/// Состояния узла в Raft протоколе
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
|
pub enum RaftState {
|
||||||
|
Follower,
|
||||||
|
Candidate,
|
||||||
|
Leader,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Atomic Raft состояние
|
||||||
|
struct AtomicRaftState {
|
||||||
|
inner: AtomicU64, // 0=Follower, 1=Candidate, 2=Leader
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AtomicRaftState {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
inner: AtomicU64::new(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get(&self) -> RaftState {
|
||||||
|
match self.inner.load(Ordering::Acquire) {
|
||||||
|
0 => RaftState::Follower,
|
||||||
|
1 => RaftState::Candidate,
|
||||||
|
2 => RaftState::Leader,
|
||||||
|
_ => RaftState::Follower,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set(&self, state: RaftState) {
|
||||||
|
let value = match state {
|
||||||
|
RaftState::Follower => 0,
|
||||||
|
RaftState::Candidate => 1,
|
||||||
|
RaftState::Leader => 2,
|
||||||
|
};
|
||||||
|
self.inner.store(value, Ordering::Release);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn compare_exchange(&self, current: RaftState, new: RaftState, order: Ordering) -> std::result::Result<RaftState, RaftState> {
|
||||||
|
let current_val = match current {
|
||||||
|
RaftState::Follower => 0,
|
||||||
|
RaftState::Candidate => 1,
|
||||||
|
RaftState::Leader => 2,
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_val = match new {
|
||||||
|
RaftState::Follower => 0,
|
||||||
|
RaftState::Candidate => 1,
|
||||||
|
RaftState::Leader => 2,
|
||||||
|
};
|
||||||
|
|
||||||
|
match self.inner.compare_exchange(current_val, new_val, order, Ordering::Relaxed) {
|
||||||
|
Ok(_) => Ok(new),
|
||||||
|
Err(actual_val) => {
|
||||||
|
let actual_state = match actual_val {
|
||||||
|
0 => RaftState::Follower,
|
||||||
|
1 => RaftState::Candidate,
|
||||||
|
2 => RaftState::Leader,
|
||||||
|
_ => RaftState::Follower,
|
||||||
|
};
|
||||||
|
Err(actual_state)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Информация о Raft узле с atomic операциями
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct RaftNode {
|
||||||
|
pub node_id: String,
|
||||||
|
pub address: String,
|
||||||
|
pub state: RaftState,
|
||||||
|
pub term: u64,
|
||||||
|
pub voted_for: Option<String>,
|
||||||
|
pub last_heartbeat: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Lock-Free хэш-таблица для узлов
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct LockFreeNodeMap {
|
||||||
|
inner: Atomic<HashMap<String, ShardNode>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LockFreeNodeMap {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Atomic::new(HashMap::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&self, key: String, value: ShardNode, guard: &Guard) -> Option<ShardNode> {
|
||||||
|
loop {
|
||||||
|
let current = self.inner.load(Ordering::Acquire, guard);
|
||||||
|
let mut new_map = HashMap::new();
|
||||||
|
|
||||||
|
if let Some(ref map) = unsafe { current.as_ref() } {
|
||||||
|
new_map.extend(map.iter().map(|(k, v)| (k.clone(), v.clone())));
|
||||||
|
}
|
||||||
|
|
||||||
|
let old_value = new_map.insert(key.clone(), value.clone());
|
||||||
|
|
||||||
|
let new_ptr = Owned::new(new_map);
|
||||||
|
if self.inner.compare_exchange(current, new_ptr, Ordering::Release, Ordering::Relaxed, guard).is_ok() {
|
||||||
|
return old_value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get(&self, key: &str, guard: &Guard) -> Option<ShardNode> {
|
||||||
|
let current = self.inner.load(Ordering::Acquire, guard);
|
||||||
|
if let Some(map) = unsafe { current.as_ref() } {
|
||||||
|
map.get(key).cloned()
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove(&self, key: &str, guard: &Guard) -> Option<ShardNode> {
|
||||||
|
loop {
|
||||||
|
let current = self.inner.load(Ordering::Acquire, guard);
|
||||||
|
if let Some(map) = unsafe { current.as_ref() } {
|
||||||
|
let mut new_map = HashMap::new();
|
||||||
|
new_map.extend(map.iter().filter(|(k, _)| **k != key).map(|(k, v)| (k.clone(), v.clone())));
|
||||||
|
|
||||||
|
let new_ptr = Owned::new(new_map);
|
||||||
|
if self.inner.compare_exchange(current, new_ptr, Ordering::Release, Ordering::Relaxed, guard).is_ok() {
|
||||||
|
return map.get(key).cloned();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn len(&self, guard: &Guard) -> usize {
|
||||||
|
let current = self.inner.load(Ordering::Acquire, guard);
|
||||||
|
if let Some(map) = unsafe { current.as_ref() } {
|
||||||
|
map.len()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter<'a>(&'a self, guard: &'a Guard) -> Vec<(&'a String, &'a ShardNode)> {
|
||||||
|
let current = self.inner.load(Ordering::Acquire, guard);
|
||||||
|
if let Some(map) = unsafe { current.as_ref() } {
|
||||||
|
map.iter().collect()
|
||||||
|
} else {
|
||||||
|
Vec::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Информация о шард-узле с Raft
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct ShardNode {
|
||||||
|
pub node_id: String,
|
||||||
|
pub address: String,
|
||||||
|
pub capacity: u64,
|
||||||
|
pub used: u64,
|
||||||
|
pub collections: Vec<String>,
|
||||||
|
pub raft_info: RaftNode,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Состояние шардинга для коллекции
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct CollectionSharding {
|
||||||
|
pub shard_key: String,
|
||||||
|
pub virtual_nodes: usize,
|
||||||
|
pub ring: BTreeMap<u64, String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// События репликации
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub enum ReplicationEvent {
|
||||||
|
Command(protocol::Command),
|
||||||
|
SyncRequest,
|
||||||
|
Heartbeat,
|
||||||
|
RaftVoteRequest { term: u64, candidate_id: String },
|
||||||
|
RaftVoteResponse { term: u64, vote_granted: bool },
|
||||||
|
RaftAppendEntries { term: u64, leader_id: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Lock-Free очередь репликации
|
||||||
|
struct LockFreeReplicationQueue {
|
||||||
|
queue: SegQueue<ReplicationEvent>,
|
||||||
|
size: AtomicUsize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LockFreeReplicationQueue {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
queue: SegQueue::new(),
|
||||||
|
size: AtomicUsize::new(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn push(&self, event: ReplicationEvent) {
|
||||||
|
self.queue.push(event);
|
||||||
|
self.size.fetch_add(1, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pop(&self) -> Option<ReplicationEvent> {
|
||||||
|
let event = self.queue.pop();
|
||||||
|
if event.is_some() {
|
||||||
|
self.size.fetch_sub(1, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
event
|
||||||
|
}
|
||||||
|
|
||||||
|
fn len(&self) -> usize {
|
||||||
|
self.size.load(Ordering::Acquire)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Менеджер шардинга и репликации с Raft
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ShardingManager {
|
||||||
|
// Шардинг компоненты
|
||||||
|
nodes: Arc<LockFreeNodeMap>,
|
||||||
|
collections: Arc<Atomic<HashMap<String, CollectionSharding>>>,
|
||||||
|
virtual_nodes_per_node: usize,
|
||||||
|
min_nodes_for_cluster: usize,
|
||||||
|
|
||||||
|
// Raft компоненты с atomic операциями
|
||||||
|
current_term: Arc<AtomicU64>,
|
||||||
|
voted_for: Arc<Atomic<HashMap<u64, String>>>,
|
||||||
|
is_leader: Arc<AtomicBool>,
|
||||||
|
raft_state: Arc<AtomicRaftState>,
|
||||||
|
cluster_formed: Arc<AtomicBool>,
|
||||||
|
|
||||||
|
// Репликация компоненты
|
||||||
|
replication_queue: Arc<LockFreeReplicationQueue>,
|
||||||
|
sequence_number: Arc<AtomicU64>,
|
||||||
|
replication_enabled: Arc<AtomicBool>,
|
||||||
|
node_id: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ShardingManager {
|
||||||
|
pub fn new(
|
||||||
|
virtual_nodes_per_node: usize,
|
||||||
|
replication_enabled: bool,
|
||||||
|
min_nodes_for_cluster: usize,
|
||||||
|
node_id: String
|
||||||
|
) -> Self {
|
||||||
|
let manager = Self {
|
||||||
|
nodes: Arc::new(LockFreeNodeMap::new()),
|
||||||
|
collections: Arc::new(Atomic::new(HashMap::new())),
|
||||||
|
virtual_nodes_per_node,
|
||||||
|
min_nodes_for_cluster,
|
||||||
|
current_term: Arc::new(AtomicU64::new(0)),
|
||||||
|
voted_for: Arc::new(Atomic::new(HashMap::new())),
|
||||||
|
is_leader: Arc::new(AtomicBool::new(false)),
|
||||||
|
raft_state: Arc::new(AtomicRaftState::new()),
|
||||||
|
cluster_formed: Arc::new(AtomicBool::new(false)),
|
||||||
|
replication_queue: Arc::new(LockFreeReplicationQueue::new()),
|
||||||
|
sequence_number: Arc::new(AtomicU64::new(0)),
|
||||||
|
replication_enabled: Arc::new(AtomicBool::new(replication_enabled)),
|
||||||
|
node_id,
|
||||||
|
};
|
||||||
|
|
||||||
|
let _ = manager.add_node(
|
||||||
|
manager.node_id.clone(),
|
||||||
|
"127.0.0.1:8081".to_string(),
|
||||||
|
1024 * 1024 * 1024
|
||||||
|
);
|
||||||
|
|
||||||
|
let manager_clone = manager.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
manager_clone.run_replication_loop().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
manager
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_replication_loop(self) {
|
||||||
|
let mut heartbeat_interval = interval(Duration::from_millis(1000));
|
||||||
|
let mut election_timeout = interval(Duration::from_millis(5000));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = heartbeat_interval.tick() => {
|
||||||
|
if self.is_leader.load(Ordering::SeqCst) &&
|
||||||
|
self.replication_enabled.load(Ordering::SeqCst) &&
|
||||||
|
self.cluster_formed.load(Ordering::SeqCst) {
|
||||||
|
let _ = self.send_heartbeat().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = election_timeout.tick() => {
|
||||||
|
if !self.is_leader.load(Ordering::SeqCst) &&
|
||||||
|
self.replication_enabled.load(Ordering::SeqCst) &&
|
||||||
|
self.cluster_formed.load(Ordering::SeqCst) {
|
||||||
|
let _ = self.start_election();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = tokio::time::sleep(Duration::from_millis(10)) => {
|
||||||
|
while let Some(event) = self.replication_queue.pop() {
|
||||||
|
self.handle_replication_event(event).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_replication_event(&self, event: ReplicationEvent) {
|
||||||
|
if !self.replication_enabled.load(Ordering::SeqCst) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
match event {
|
||||||
|
ReplicationEvent::Command(cmd) => {
|
||||||
|
self.replicate_command(cmd).await;
|
||||||
|
}
|
||||||
|
ReplicationEvent::SyncRequest => {
|
||||||
|
self.sync_with_nodes().await;
|
||||||
|
}
|
||||||
|
ReplicationEvent::Heartbeat => {
|
||||||
|
let _ = self.send_heartbeat().await;
|
||||||
|
}
|
||||||
|
ReplicationEvent::RaftVoteRequest { term, candidate_id } => {
|
||||||
|
self.handle_vote_request(term, candidate_id).await;
|
||||||
|
}
|
||||||
|
ReplicationEvent::RaftVoteResponse { term, vote_granted } => {
|
||||||
|
self.handle_vote_response(term, vote_granted).await;
|
||||||
|
}
|
||||||
|
ReplicationEvent::RaftAppendEntries { term, leader_id } => {
|
||||||
|
self.handle_append_entries(term, leader_id).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn replicate_command(&self, command: protocol::Command) {
|
||||||
|
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst);
|
||||||
|
|
||||||
|
let guard = epoch::pin();
|
||||||
|
let nodes: Vec<_> = self.nodes.iter(&guard)
|
||||||
|
.into_iter() // ИСПРАВЛЕНО: Добавлен вызов into_iter()
|
||||||
|
.map(|(_, v)| v.clone())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for node in nodes {
|
||||||
|
if self.is_leader.load(Ordering::SeqCst) && node.raft_info.node_id == self.node_id {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let node_addr = node.address.clone();
|
||||||
|
let cmd_clone = command.clone();
|
||||||
|
let seq_clone = sequence;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = Self::send_command_to_node(&node_addr, &cmd_clone, seq_clone).await {
|
||||||
|
eprintln!("Failed to replicate to {}: {}", node_addr, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> {
|
||||||
|
let mut stream = match tokio::net::TcpStream::connect(node).await {
|
||||||
|
Ok(stream) => stream,
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("Failed to connect to {}: {}", node, e);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let message = protocol::ReplicationMessage {
|
||||||
|
sequence,
|
||||||
|
command: command.clone(),
|
||||||
|
timestamp: chrono::Utc::now().timestamp(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let bytes = protocol::serialize(&message)?;
|
||||||
|
|
||||||
|
if let Err(e) = stream.write_all(&bytes).await {
|
||||||
|
eprintln!("Failed to send command to {}: {}", node, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn sync_with_nodes(&self) {
|
||||||
|
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let guard = epoch::pin();
|
||||||
|
let node_count = self.nodes.len(&guard);
|
||||||
|
println!("Starting sync with {} nodes", node_count);
|
||||||
|
|
||||||
|
let nodes: Vec<String> = self.nodes.iter(&guard) // ИСПРАВЛЕНО: Явно указан тип String
|
||||||
|
.into_iter() // ИСПРАВЛЕНО: Добавлен вызов into_iter()
|
||||||
|
.map(|(_, v)| v.address.clone())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for node_addr in nodes {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = Self::sync_with_node(&node_addr).await {
|
||||||
|
eprintln!("Failed to sync with {}: {}", node_addr, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn sync_with_node(_node: &str) -> Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_heartbeat(&self) -> Result<()> {
|
||||||
|
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let guard = epoch::pin();
|
||||||
|
let nodes: Vec<_> = self.nodes.iter(&guard)
|
||||||
|
.into_iter() // ИСПРАВЛЕНО: Добавлен вызов into_iter()
|
||||||
|
.map(|(_, v)| v.clone())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for node in nodes {
|
||||||
|
if node.raft_info.state == RaftState::Follower && node.raft_info.node_id != self.node_id {
|
||||||
|
let node_addr = node.address.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = Self::send_heartbeat_to_node(&node_addr).await {
|
||||||
|
eprintln!("Heartbeat failed for {}: {}", node_addr, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_heartbeat_to_node(node: &str) -> Result<()> {
|
||||||
|
let mut stream = match tokio::net::TcpStream::connect(node).await {
|
||||||
|
Ok(stream) => stream,
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("Failed to connect to {} for heartbeat: {}", node, e);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let heartbeat = protocol::ReplicationMessage {
|
||||||
|
sequence: 0,
|
||||||
|
command: protocol::Command::CallProcedure { name: "heartbeat".to_string() },
|
||||||
|
timestamp: chrono::Utc::now().timestamp(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let bytes = protocol::serialize(&heartbeat)?;
|
||||||
|
|
||||||
|
if let Err(e) = stream.write_all(&bytes).await {
|
||||||
|
eprintln!("Failed to send heartbeat to {}: {}", node, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_vote_request(&self, term: u64, candidate_id: String) {
|
||||||
|
let current_term = self.current_term.load(Ordering::SeqCst);
|
||||||
|
|
||||||
|
if term > current_term {
|
||||||
|
self.current_term.store(term, Ordering::SeqCst);
|
||||||
|
|
||||||
|
let guard = epoch::pin();
|
||||||
|
let current = self.voted_for.load(Ordering::Acquire, &guard);
|
||||||
|
let mut new_map = HashMap::new();
|
||||||
|
|
||||||
|
if let Some(ref map) = unsafe { current.as_ref() } {
|
||||||
|
new_map.extend(map.iter().map(|(k, v)| (*k, v.clone())));
|
||||||
|
}
|
||||||
|
|
||||||
|
new_map.insert(term, candidate_id.clone());
|
||||||
|
|
||||||
|
let new_ptr = Owned::new(new_map);
|
||||||
|
self.voted_for.store(new_ptr, Ordering::Release);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_vote_response(&self, term: u64, vote_granted: bool) {
|
||||||
|
if vote_granted && term == self.current_term.load(Ordering::SeqCst) {
|
||||||
|
// Подсчет голосов и переход в лидеры
|
||||||
|
let guard = epoch::pin();
|
||||||
|
let node_count = self.nodes.len(&guard);
|
||||||
|
|
||||||
|
if node_count >= self.min_nodes_for_cluster {
|
||||||
|
// При получении большинства голосов становимся лидером
|
||||||
|
match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Leader, Ordering::SeqCst) {
|
||||||
|
Ok(_) => {
|
||||||
|
self.is_leader.store(true, Ordering::SeqCst);
|
||||||
|
println!("Elected as leader for term {}", term);
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
// Кто-то уже стал лидером
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_append_entries(&self, term: u64, leader_id: String) {
|
||||||
|
let current_term = self.current_term.load(Ordering::SeqCst);
|
||||||
|
|
||||||
|
if term >= current_term {
|
||||||
|
self.current_term.store(term, Ordering::SeqCst);
|
||||||
|
|
||||||
|
// Atomic изменение состояния
|
||||||
|
match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Follower, Ordering::SeqCst) {
|
||||||
|
Ok(_) => {
|
||||||
|
self.is_leader.store(false, Ordering::SeqCst);
|
||||||
|
|
||||||
|
let guard = epoch::pin();
|
||||||
|
if let Some(mut node) = self.nodes.get(&self.node_id, &guard) {
|
||||||
|
node.raft_info.state = RaftState::Follower;
|
||||||
|
node.raft_info.term = term;
|
||||||
|
node.raft_info.last_heartbeat = chrono::Utc::now().timestamp();
|
||||||
|
|
||||||
|
self.nodes.insert(self.node_id.clone(), node, &guard);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
// Уже в другом состоянии
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> {
|
||||||
|
let raft_node = RaftNode {
|
||||||
|
node_id: node_id.clone(),
|
||||||
|
address: address.clone(),
|
||||||
|
state: RaftState::Follower,
|
||||||
|
term: 0,
|
||||||
|
voted_for: None,
|
||||||
|
last_heartbeat: chrono::Utc::now().timestamp(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let node = ShardNode {
|
||||||
|
node_id: node_id.clone(),
|
||||||
|
address,
|
||||||
|
capacity,
|
||||||
|
used: 0,
|
||||||
|
collections: Vec::new(),
|
||||||
|
raft_info: raft_node,
|
||||||
|
};
|
||||||
|
|
||||||
|
let guard = epoch::pin();
|
||||||
|
self.nodes.insert(node_id, node, &guard);
|
||||||
|
|
||||||
|
let node_count = self.nodes.len(&guard);
|
||||||
|
if node_count >= self.min_nodes_for_cluster {
|
||||||
|
self.cluster_formed.store(true, Ordering::SeqCst);
|
||||||
|
println!("Cluster formed with {} nodes (minimum required: {})",
|
||||||
|
node_count, self.min_nodes_for_cluster);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_node(&self, node_id: &str) -> Result<()> {
|
||||||
|
let guard = epoch::pin();
|
||||||
|
self.nodes.remove(node_id, &guard);
|
||||||
|
|
||||||
|
let node_count = self.nodes.len(&guard);
|
||||||
|
if node_count < self.min_nodes_for_cluster {
|
||||||
|
self.cluster_formed.store(false, Ordering::SeqCst);
|
||||||
|
self.is_leader.store(false, Ordering::SeqCst);
|
||||||
|
println!("Cluster no longer formed. Have {} nodes (need {})",
|
||||||
|
node_count, self.min_nodes_for_cluster);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> {
|
||||||
|
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||||
|
return Err(crate::common::FutriixError::ShardingError(
|
||||||
|
format!("Cannot setup sharding: cluster not formed. Need at least {} nodes.",
|
||||||
|
self.min_nodes_for_cluster)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let sharding = CollectionSharding {
|
||||||
|
shard_key: shard_key.to_string(),
|
||||||
|
virtual_nodes: self.virtual_nodes_per_node,
|
||||||
|
ring: BTreeMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let guard = epoch::pin();
|
||||||
|
let current = self.collections.load(Ordering::Acquire, &guard);
|
||||||
|
let mut new_map = HashMap::new();
|
||||||
|
|
||||||
|
if let Some(ref map) = unsafe { current.as_ref() } {
|
||||||
|
new_map.extend(map.iter().map(|(k, v)| (k.clone(), v.clone())));
|
||||||
|
}
|
||||||
|
|
||||||
|
new_map.insert(collection.to_string(), sharding);
|
||||||
|
|
||||||
|
let new_ptr = Owned::new(new_map);
|
||||||
|
self.collections.store(new_ptr, Ordering::Release);
|
||||||
|
|
||||||
|
self.rebuild_ring(collection)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rebuild_ring(&self, collection: &str) -> Result<()> {
|
||||||
|
let guard = epoch::pin();
|
||||||
|
let collections_current = self.collections.load(Ordering::Acquire, &guard);
|
||||||
|
|
||||||
|
if let Some(collections_map) = unsafe { collections_current.as_ref() } {
|
||||||
|
if let Some(mut sharding) = collections_map.get(collection).cloned() {
|
||||||
|
sharding.ring.clear();
|
||||||
|
|
||||||
|
let nodes: Vec<String> = self.nodes.iter(&guard) // ИСПРАВЛЕНО: Явно указан тип String
|
||||||
|
.into_iter() // ИСПРАВЛЕНО: Добавлен вызов into_iter()
|
||||||
|
.map(|(k, _)| k.clone())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for node_id in nodes {
|
||||||
|
for i in 0..sharding.virtual_nodes {
|
||||||
|
let key = format!("{}-{}", node_id, i);
|
||||||
|
let hash = self.hash_key(&key);
|
||||||
|
sharding.ring.insert(hash, node_id.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Обновляем коллекцию
|
||||||
|
let mut new_map = HashMap::new();
|
||||||
|
new_map.extend(collections_map.iter().map(|(k, v)| {
|
||||||
|
if k == collection {
|
||||||
|
(k.clone(), sharding.clone())
|
||||||
|
} else {
|
||||||
|
(k.clone(), v.clone())
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
let new_ptr = Owned::new(new_map);
|
||||||
|
self.collections.store(new_ptr, Ordering::Release);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn hash_key(&self, key: &str) -> u64 {
|
||||||
|
let mut hasher = SipHasher13::new();
|
||||||
|
key.hash(&mut hasher);
|
||||||
|
hasher.finish()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result<Option<String>> {
|
||||||
|
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||||
|
return Err(crate::common::FutriixError::ShardingError(
|
||||||
|
format!("Cannot find node: cluster not formed. Need at least {} nodes.",
|
||||||
|
self.min_nodes_for_cluster)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let guard = epoch::pin();
|
||||||
|
let collections_current = self.collections.load(Ordering::Acquire, &guard);
|
||||||
|
|
||||||
|
if let Some(collections_map) = unsafe { collections_current.as_ref() } {
|
||||||
|
if let Some(sharding) = collections_map.get(collection) {
|
||||||
|
let key_hash = self.hash_key(key_value);
|
||||||
|
|
||||||
|
let mut range = sharding.ring.range(key_hash..);
|
||||||
|
if let Some((_, node_id)) = range.next() {
|
||||||
|
return Ok(Some(node_id.clone()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some((_, node_id)) = sharding.ring.iter().next() {
|
||||||
|
return Ok(Some(node_id.clone()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> {
|
||||||
|
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||||
|
return Err(crate::common::FutriixError::ShardingError(
|
||||||
|
format!("Cannot migrate shard: cluster not formed. Need at least {} nodes.",
|
||||||
|
self.min_nodes_for_cluster)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let guard = epoch::pin();
|
||||||
|
|
||||||
|
if self.nodes.get(from_node, &guard).is_none() {
|
||||||
|
return Err(crate::common::FutriixError::ShardingError(
|
||||||
|
format!("Source node '{}' not found in cluster", from_node)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.nodes.get(to_node, &guard).is_none() {
|
||||||
|
return Err(crate::common::FutriixError::ShardingError(
|
||||||
|
format!("Destination node '{}' not found in cluster", to_node)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Migrating shard for collection '{}' from {} to {} with key {}",
|
||||||
|
collection, from_node, to_node, shard_key);
|
||||||
|
|
||||||
|
self.rebuild_ring(collection)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn rebalance_cluster(&self) -> Result<()> {
|
||||||
|
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||||
|
return Err(crate::common::FutriixError::ShardingError(
|
||||||
|
format!("Cannot rebalance cluster: cluster not formed. Need at least {} nodes.",
|
||||||
|
self.min_nodes_for_cluster)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let guard = epoch::pin();
|
||||||
|
let node_count = self.nodes.len(&guard);
|
||||||
|
println!("Rebalancing cluster with {} nodes", node_count);
|
||||||
|
|
||||||
|
let collections_current = self.collections.load(Ordering::Acquire, &guard);
|
||||||
|
let mut new_collections = HashMap::new();
|
||||||
|
|
||||||
|
if let Some(collections_map) = unsafe { collections_current.as_ref() } {
|
||||||
|
for (name, sharding) in collections_map {
|
||||||
|
let mut new_sharding = sharding.clone();
|
||||||
|
new_sharding.ring.clear();
|
||||||
|
|
||||||
|
let nodes: Vec<String> = self.nodes.iter(&guard) // ИСПРАВЛЕНО: Явно указан тип String
|
||||||
|
.into_iter() // ИСПРАВЛЕНО: Добавлен вызов into_iter()
|
||||||
|
.map(|(k, _)| k.clone())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for node_id in nodes {
|
||||||
|
for i in 0..new_sharding.virtual_nodes {
|
||||||
|
let key = format!("{}-{}", node_id, i);
|
||||||
|
let hash = self.hash_key(&key);
|
||||||
|
new_sharding.ring.insert(hash, node_id.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
new_collections.insert(name.clone(), new_sharding);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let new_ptr = Owned::new(new_collections);
|
||||||
|
self.collections.store(new_ptr, Ordering::Release);
|
||||||
|
|
||||||
|
self.rebalance_nodes(&guard)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rebalance_nodes(&self, guard: &Guard) -> Result<()> {
|
||||||
|
println!("Rebalancing nodes in cluster...");
|
||||||
|
|
||||||
|
let mut total_capacity = 0;
|
||||||
|
let mut total_used = 0;
|
||||||
|
let mut nodes_info = Vec::new();
|
||||||
|
|
||||||
|
for (_, node) in self.nodes.iter(guard) {
|
||||||
|
total_capacity += node.capacity;
|
||||||
|
total_used += node.used;
|
||||||
|
nodes_info.push((node.node_id.clone(), node.used, node.capacity));
|
||||||
|
}
|
||||||
|
|
||||||
|
let avg_usage = if total_capacity > 0 { total_used as f64 / total_capacity as f64 } else { 0.0 };
|
||||||
|
|
||||||
|
println!("Cluster usage: {:.2}% ({} / {})", avg_usage * 100.0, total_used, total_capacity);
|
||||||
|
|
||||||
|
let mut overloaded_nodes = Vec::new();
|
||||||
|
let mut underloaded_nodes = Vec::new();
|
||||||
|
|
||||||
|
for (node_id, used, capacity) in nodes_info {
|
||||||
|
let usage = if capacity > 0 { used as f64 / capacity as f64 } else { 0.0 };
|
||||||
|
|
||||||
|
if usage > avg_usage * 1.2 {
|
||||||
|
overloaded_nodes.push((node_id, usage));
|
||||||
|
} else if usage < avg_usage * 0.8 {
|
||||||
|
underloaded_nodes.push((node_id, usage));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Overloaded nodes: {}", overloaded_nodes.len());
|
||||||
|
println!("Underloaded nodes: {}", underloaded_nodes.len());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_cluster_status(&self) -> Result<protocol::ClusterStatus> {
|
||||||
|
let guard = epoch::pin();
|
||||||
|
|
||||||
|
let mut cluster_nodes = Vec::new();
|
||||||
|
let mut total_capacity = 0;
|
||||||
|
let mut total_used = 0;
|
||||||
|
let mut raft_nodes = Vec::new();
|
||||||
|
|
||||||
|
for (_, node) in self.nodes.iter(&guard) {
|
||||||
|
total_capacity += node.capacity;
|
||||||
|
total_used += node.used;
|
||||||
|
|
||||||
|
cluster_nodes.push(protocol::ShardInfo {
|
||||||
|
node_id: node.node_id.clone(),
|
||||||
|
address: node.address.clone(),
|
||||||
|
capacity: node.capacity,
|
||||||
|
used: node.used,
|
||||||
|
collections: node.collections.clone(),
|
||||||
|
});
|
||||||
|
|
||||||
|
raft_nodes.push(protocol::RaftNodeInfo {
|
||||||
|
node_id: node.node_id.clone(),
|
||||||
|
address: node.address.clone(),
|
||||||
|
state: match node.raft_info.state {
|
||||||
|
RaftState::Leader => "leader".to_string(),
|
||||||
|
RaftState::Follower => "follower".to_string(),
|
||||||
|
RaftState::Candidate => "candidate".to_string(),
|
||||||
|
},
|
||||||
|
term: node.raft_info.term,
|
||||||
|
last_heartbeat: node.raft_info.last_heartbeat,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let rebalance_needed = {
|
||||||
|
if total_capacity == 0 {
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
let avg_usage = total_used as f64 / total_capacity as f64;
|
||||||
|
let mut needs_rebalance = false;
|
||||||
|
|
||||||
|
for (_, node) in self.nodes.iter(&guard) {
|
||||||
|
let usage = if node.capacity > 0 {
|
||||||
|
node.used as f64 / node.capacity as f64
|
||||||
|
} else {
|
||||||
|
0.0
|
||||||
|
};
|
||||||
|
|
||||||
|
if usage > avg_usage * 1.2 || usage < avg_usage * 0.8 {
|
||||||
|
needs_rebalance = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
needs_rebalance
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(protocol::ClusterStatus {
|
||||||
|
nodes: cluster_nodes,
|
||||||
|
total_capacity,
|
||||||
|
total_used,
|
||||||
|
rebalance_needed,
|
||||||
|
cluster_formed: self.cluster_formed.load(Ordering::SeqCst),
|
||||||
|
leader_exists: self.is_leader.load(Ordering::SeqCst),
|
||||||
|
raft_nodes,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_raft_nodes(&self) -> Vec<RaftNode> {
|
||||||
|
let guard = epoch::pin();
|
||||||
|
self.nodes.iter(&guard)
|
||||||
|
.into_iter() // ИСПРАВЛЕНО: Добавлен вызов into_iter()
|
||||||
|
.map(|(_, node)| node.raft_info.clone())
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_cluster_formed(&self) -> bool {
|
||||||
|
self.cluster_formed.load(Ordering::SeqCst)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start_election(&self) -> Result<()> {
|
||||||
|
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||||
|
return Err(crate::common::FutriixError::ShardingError(
|
||||||
|
format!("Cluster not formed. Need at least {} nodes.", self.min_nodes_for_cluster)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let new_term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1;
|
||||||
|
println!("Starting election for term {}", new_term);
|
||||||
|
|
||||||
|
self.is_leader.store(false, Ordering::SeqCst);
|
||||||
|
|
||||||
|
// Atomic переход в состояние candidate
|
||||||
|
match self.raft_state.compare_exchange(RaftState::Follower, RaftState::Candidate, Ordering::SeqCst) {
|
||||||
|
Ok(_) => {
|
||||||
|
let guard = epoch::pin();
|
||||||
|
if let Some(mut node) = self.nodes.get(&self.node_id, &guard) {
|
||||||
|
node.raft_info.state = RaftState::Candidate;
|
||||||
|
node.raft_info.term = new_term;
|
||||||
|
node.raft_info.voted_for = Some(self.node_id.clone());
|
||||||
|
|
||||||
|
self.nodes.insert(self.node_id.clone(), node, &guard);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.replication_queue.push(ReplicationEvent::RaftVoteRequest {
|
||||||
|
term: new_term,
|
||||||
|
candidate_id: self.node_id.clone(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(current_state) => {
|
||||||
|
println!("Already in state {:?}, cannot start election", current_state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn replicate(&self, command: protocol::Command) -> Result<()> {
|
||||||
|
if !self.replication_enabled.load(Ordering::SeqCst) {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||||
|
return Err(crate::common::FutriixError::ShardingError(
|
||||||
|
"Cannot replicate: cluster not formed".to_string()
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
self.replication_queue.push(ReplicationEvent::Command(command));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn request_sync(&self) -> Result<()> {
|
||||||
|
if !self.replication_enabled.load(Ordering::SeqCst) {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
self.replication_queue.push(ReplicationEvent::SyncRequest);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_nodes(&self) -> Vec<ShardNode> {
|
||||||
|
let guard = epoch::pin();
|
||||||
|
self.nodes.iter(&guard)
|
||||||
|
.into_iter() // ИСПРАВЛЕНО: Добавлен вызов into_iter()
|
||||||
|
.map(|(_, node)| node.clone())
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_sequence_number(&self) -> u64 {
|
||||||
|
self.sequence_number.load(Ordering::SeqCst)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_replication_enabled(&self) -> bool {
|
||||||
|
self.replication_enabled.load(Ordering::SeqCst)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_node(&self, node_id: &str) -> Option<ShardNode> {
|
||||||
|
let guard = epoch::pin();
|
||||||
|
self.nodes.get(node_id, &guard)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_node_id(&self) -> &str {
|
||||||
|
&self.node_id
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user