Upload files to "src/server"

This commit is contained in:
Григорий Сафронов 2025-12-06 15:57:56 +00:00
parent 4244b4c0c2
commit 1f9d1c3aad
3 changed files with 2277 additions and 0 deletions

View File

@ -0,0 +1,383 @@
// src/server/csv_import_export.rs
//! Lock-free модуль для импорта/экспорта данных в формате CSV
//!
//! Основные функции:
//! 1. Импорт CSV файлов в коллекции базы данных
//! 2. Экспорт коллекций в CSV файлы
//! 3. Отслеживание прогресса импорта с атомарными счетчиками
//! 4. Буферизация операций для повышения производительности
//!
//! Особенности:
//! - Атомарные операции без блокировок через DashMap
//! - Автоматическое определение типов данных
//! - Обработка больших файлов с пагинацией
//! - Отслеживание прогресса в реальном времени
use std::sync::Arc;
use std::fs::File;
use std::io::{BufReader, BufWriter};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use csv::{Reader, Writer};
use serde_json::Value;
use dashmap::DashMap;
use tokio::sync::RwLock;
use crate::common::Result;
use crate::common::config::CsvConfig;
use crate::server::database::Database;
/// Lock-free хэш-таблица для прогресса импорта
/// Использует DashMap для атомарного доступа к данным прогресса
struct LockFreeProgressMap {
progress_data: DashMap<String, f64>,
active_imports: AtomicUsize,
}
impl LockFreeProgressMap {
fn new() -> Self {
Self {
progress_data: DashMap::new(),
active_imports: AtomicUsize::new(0),
}
}
fn insert(&self, key: String, value: f64) -> Option<f64> {
if value == 100.0 {
// Импорт завершен, удаляем из активных
self.active_imports.fetch_sub(1, Ordering::SeqCst);
} else if value == 0.0 {
// Начинаем новый импорт
self.active_imports.fetch_add(1, Ordering::SeqCst);
}
self.progress_data.insert(key, value)
}
fn get(&self, key: &str) -> Option<f64> {
self.progress_data.get(key).map(|entry| *entry)
}
fn remove(&self, key: &str) -> Option<f64> {
self.progress_data.remove(key).map(|(_, v)| v)
}
fn len(&self) -> usize {
self.progress_data.len()
}
fn active_imports(&self) -> usize {
self.active_imports.load(Ordering::SeqCst)
}
}
/// Менеджер CSV операций с lock-free архитектурой
#[derive(Clone)]
pub struct CsvManager {
database: Arc<Database>,
config: CsvConfig,
import_progress: Arc<LockFreeProgressMap>,
// Используем RwLock для буферизации, но без блокировок на операции
import_buffer: Arc<RwLock<DashMap<String, Vec<Value>>>>,
}
impl CsvManager {
pub fn new(database: Arc<Database>, config: CsvConfig) -> Self {
// Создаем директории для импорта и экспорта, если они не существуют
let _ = std::fs::create_dir_all(&config.import_dir);
let _ = std::fs::create_dir_all(&config.export_dir);
Self {
database,
config,
import_progress: Arc::new(LockFreeProgressMap::new()),
import_buffer: Arc::new(RwLock::new(DashMap::new())),
}
}
pub fn import_csv(&self, collection_name: &str, file_path: &str) -> Result<usize> {
// Проверяем размер файла
let metadata = std::fs::metadata(file_path)
.map_err(|e| crate::common::FutriixError::IoError(e))?;
if metadata.len() > self.config.max_file_size {
return Err(crate::common::FutriixError::CsvError(
format!("File size {} exceeds maximum allowed size {}",
metadata.len(), self.config.max_file_size)
));
}
println!("Importing CSV file '{}' into collection '{}'", file_path, collection_name);
// Открываем файл для чтения
let file = File::open(file_path)
.map_err(|e| crate::common::FutriixError::IoError(e))?;
let mut reader = Reader::from_reader(BufReader::new(file));
// Читаем заголовки
let headers: Vec<String> = reader.headers()?
.iter()
.map(|s| s.to_string())
.collect();
let mut record_count = 0;
// Убираем mut, так как переменная не изменяется
let error_count = 0;
// Начинаем импорт
self.import_progress.insert(collection_name.to_string(), 0.0);
// Буферизируем записи для batch вставки
let mut buffer = Vec::new();
for result in reader.records() {
let record = result?;
let mut document = serde_json::Map::new();
for (i, field) in record.iter().enumerate() {
let header = if i < headers.len() {
&headers[i]
} else {
&format!("field_{}", i)
};
let value = Self::parse_field_value(field);
document.insert(header.to_string(), value);
}
let json_value = Value::Object(document);
buffer.push(json_value);
if buffer.len() >= 100 {
// Вставляем пачку записей
let inserted = self.insert_batch(collection_name, &buffer)?;
record_count += inserted;
buffer.clear();
if record_count % 100 == 0 {
println!("Imported {} records...", record_count);
let progress = (record_count as f64 / 1000.0) * 100.0;
self.import_progress.insert(collection_name.to_string(), progress);
}
}
}
// Вставляем оставшиеся записи
if !buffer.is_empty() {
let inserted = self.insert_batch(collection_name, &buffer)?;
record_count += inserted;
}
// Завершаем импорт
self.import_progress.insert(collection_name.to_string(), 100.0);
if error_count > 0 {
println!("Import completed with {} successful records and {} errors",
record_count, error_count);
} else {
println!("Successfully imported {} records into collection '{}'",
record_count, collection_name);
}
Ok(record_count)
}
/// Вставляет пачку документов атомарно
fn insert_batch(&self, collection_name: &str, documents: &[Value]) -> Result<usize> {
let mut inserted = 0;
for document in documents {
let json_string = serde_json::to_string(document)?;
let command = crate::common::protocol::Command::Create {
collection: collection_name.to_string(),
document: json_string.into_bytes(),
};
match self.database.execute_command(command) {
Ok(_) => {
inserted += 1;
}
Err(e) => {
eprintln!("Failed to import record: {}", e);
}
}
}
Ok(inserted)
}
/// Парсит строковое значение поля в соответствующий тип JSON
fn parse_field_value(field: &str) -> Value {
if field.is_empty() {
return Value::Null;
}
// Пытаемся распарсить как целое число
if let Ok(int_val) = field.parse::<i64>() {
return Value::Number(int_val.into());
}
// Пытаемся распарсить как число с плавающей точкой
if let Ok(float_val) = field.parse::<f64>() {
if let Some(num) = serde_json::Number::from_f64(float_val) {
return Value::Number(num);
}
}
// Проверяем булевые значения
match field.to_lowercase().as_str() {
"true" => return Value::Bool(true),
"false" => return Value::Bool(false),
_ => {}
}
// По умолчанию возвращаем как строку
Value::String(field.to_string())
}
pub fn export_csv(&self, collection_name: &str, file_path: &str) -> Result<usize> {
println!("Exporting collection '{}' to CSV file '{}'", collection_name, file_path);
// Создаем файл для записи
let file = File::create(file_path)
.map_err(|e| crate::common::FutriixError::IoError(e))?;
let mut writer = Writer::from_writer(BufWriter::new(file));
// Запрашиваем все документы из коллекции
let command = crate::common::protocol::Command::Query {
collection: collection_name.to_string(),
filter: vec![],
};
let response = self.database.execute_command(command)?;
let documents = match response {
crate::common::protocol::Response::Success(data) => {
serde_json::from_slice::<Vec<Value>>(&data)?
}
crate::common::protocol::Response::Error(e) => {
return Err(crate::common::FutriixError::DatabaseError(e));
}
};
if documents.is_empty() {
println!("Collection '{}' is empty", collection_name);
return Ok(0);
}
// Собираем все уникальные заголовки из документов
// Убираем mut, так как DashSet не требует изменяемости для итерации
let all_headers = dashmap::DashSet::new();
for document in &documents {
if let Value::Object(obj) = document {
for key in obj.keys() {
all_headers.insert(key.clone());
}
}
}
let headers: Vec<String> = all_headers.into_iter().collect();
// Записываем заголовки
writer.write_record(&headers)?;
let mut record_count = 0;
// Экспортируем документы
for document in documents {
if let Value::Object(obj) = document {
let mut record = Vec::new();
for header in &headers {
let value = obj.get(header).unwrap_or(&Value::Null);
let value_str = Self::value_to_string(value);
record.push(value_str);
}
writer.write_record(&record)?;
record_count += 1;
if record_count % 100 == 0 {
println!("Exported {} records...", record_count);
}
}
}
writer.flush()?;
println!("Successfully exported {} records to '{}'", record_count, file_path);
Ok(record_count)
}
/// Конвертирует JSON значение в строку для CSV
fn value_to_string(value: &Value) -> String {
match value {
Value::String(s) => s.clone(),
Value::Number(n) => n.to_string(),
Value::Bool(b) => b.to_string(),
Value::Null => "".to_string(),
Value::Array(arr) => {
let items: Vec<String> = arr.iter().map(Self::value_to_string).collect();
format!("[{}]", items.join(","))
}
Value::Object(_) => {
// Для объектов используем JSON строку
value.to_string()
}
}
}
pub fn get_import_progress(&self, collection_name: &str) -> f64 {
self.import_progress.get(collection_name)
.unwrap_or(0.0)
}
pub fn list_csv_files(&self) -> Result<Vec<String>> {
let csv_dir = &self.config.import_dir;
let mut csv_files = Vec::new();
if let Ok(entries) = std::fs::read_dir(csv_dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_file() {
if let Some(extension) = path.extension() {
if extension == "csv" || extension == "CSV" {
if let Some(file_name) = path.file_name() {
csv_files.push(file_name.to_string_lossy().to_string());
}
}
}
}
}
}
Ok(csv_files)
}
pub fn get_import_file_path(&self, file_name: &str) -> String {
Path::new(&self.config.import_dir)
.join(file_name)
.to_string_lossy()
.to_string()
}
pub fn get_export_file_path(&self, file_name: &str) -> String {
Path::new(&self.config.export_dir)
.join(file_name)
.to_string_lossy()
.to_string()
}
pub fn file_exists(&self, file_path: &str) -> bool {
Path::new(file_path).exists()
}
pub fn active_imports_count(&self) -> usize {
self.import_progress.active_imports()
}
}

1063
src/server/database.rs Normal file

File diff suppressed because it is too large Load Diff

831
src/server/sharding.rs Normal file
View File

@ -0,0 +1,831 @@
// src/server/sharding.rs
//! Lock-free модуль шардинга с консистентным хэшированием и Raft протоколом
//!
//! Основные компоненты:
//! 1. ShardingManager - управление распределением данных по узлам кластера
//! 2. RaftState - состояния узлов в Raft протоколе для консенсуса
//! 3. CollectionSharding - настройки шардинга для отдельных коллекций
//! 4. Lock-free репликация с консистентным хэшированием
//!
//! Особенности:
//! - Консистентное хэширование для равномерного распределения данных
//! - Raft протокол для выбора лидера и консенсуса
//! - Атомарные операции без блокировок
//! - Автоматическая ребалансировка кластера
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use tokio::sync::mpsc;
use tokio::time::{interval, Duration};
use tokio::io::AsyncWriteExt;
use serde::{Serialize, Deserialize};
use siphasher::sip::SipHasher13;
use crossbeam::queue::SegQueue;
use crossbeam::epoch::{self, Atomic, Owned, Guard};
use dashmap::{DashMap, DashSet};
use crate::common::Result;
use crate::common::protocol;
/// Состояния узла в Raft протоколе
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum RaftState {
Follower,
Candidate,
Leader,
}
/// Atomic Raft состояние для атомарных операций
struct AtomicRaftState {
inner: AtomicU64,
}
impl AtomicRaftState {
fn new() -> Self {
Self {
inner: AtomicU64::new(0),
}
}
fn get(&self) -> RaftState {
match self.inner.load(Ordering::Acquire) {
0 => RaftState::Follower,
1 => RaftState::Candidate,
2 => RaftState::Leader,
_ => RaftState::Follower,
}
}
fn set(&self, state: RaftState) {
let value = match state {
RaftState::Follower => 0,
RaftState::Candidate => 1,
RaftState::Leader => 2,
};
self.inner.store(value, Ordering::Release);
}
fn compare_exchange(&self, current: RaftState, new: RaftState, order: Ordering) -> std::result::Result<RaftState, RaftState> {
let current_val = match current {
RaftState::Follower => 0,
RaftState::Candidate => 1,
RaftState::Leader => 2,
};
let new_val = match new {
RaftState::Follower => 0,
RaftState::Candidate => 1,
RaftState::Leader => 2,
};
match self.inner.compare_exchange(current_val, new_val, order, Ordering::Relaxed) {
Ok(_) => Ok(new),
Err(actual_val) => {
let actual_state = match actual_val {
0 => RaftState::Follower,
1 => RaftState::Candidate,
2 => RaftState::Leader,
_ => RaftState::Follower,
};
Err(actual_state)
}
}
}
}
/// Информация о Raft узле
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RaftNode {
pub node_id: String,
pub address: String,
pub state: RaftState,
pub term: u64,
pub voted_for: Option<String>,
pub last_heartbeat: i64,
}
/// Информация о шард-узле с Raft
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShardNode {
pub node_id: String,
pub address: String,
pub capacity: u64,
pub used: u64,
pub collections: Vec<String>,
pub raft_info: RaftNode,
}
/// Состояние шардинга для коллекции
#[derive(Debug, Clone)]
pub struct CollectionSharding {
pub shard_key: String,
pub virtual_nodes: usize,
// Используем Arc<DashMap> для совместного доступа из нескольких потоков
pub ring: Arc<DashMap<u64, String>>,
}
/// События репликации
#[derive(Debug, Serialize, Deserialize)]
pub enum ReplicationEvent {
Command(protocol::Command),
SyncRequest,
Heartbeat,
RaftVoteRequest { term: u64, candidate_id: String },
RaftVoteResponse { term: u64, vote_granted: bool },
RaftAppendEntries { term: u64, leader_id: String },
}
/// Lock-Free очередь репликации на основе SegQueue
struct LockFreeReplicationQueue {
queue: SegQueue<ReplicationEvent>,
size: AtomicUsize,
}
impl LockFreeReplicationQueue {
fn new() -> Self {
Self {
queue: SegQueue::new(),
size: AtomicUsize::new(0),
}
}
fn push(&self, event: ReplicationEvent) {
self.queue.push(event);
self.size.fetch_add(1, Ordering::SeqCst);
}
fn pop(&self) -> Option<ReplicationEvent> {
let event = self.queue.pop();
if event.is_some() {
self.size.fetch_sub(1, Ordering::SeqCst);
}
event
}
fn len(&self) -> usize {
self.size.load(Ordering::Acquire)
}
}
/// Менеджер шардинга и репликации с Raft
#[derive(Clone)]
pub struct ShardingManager {
// Шардинг компоненты
nodes: Arc<DashMap<String, ShardNode>>,
// Используем DashMap для атомарного доступа к настройкам шардинга коллекций
collections: Arc<DashMap<String, CollectionSharding>>,
virtual_nodes_per_node: usize,
min_nodes_for_cluster: usize,
// Raft компоненты с atomic операциями
current_term: Arc<AtomicU64>,
voted_for: Arc<DashMap<u64, String>>,
is_leader: Arc<AtomicBool>,
raft_state: Arc<AtomicRaftState>,
cluster_formed: Arc<AtomicBool>,
// Репликация компоненты
replication_queue: Arc<LockFreeReplicationQueue>,
sequence_number: Arc<AtomicU64>,
replication_enabled: Arc<AtomicBool>,
node_id: String,
}
impl ShardingManager {
pub fn new(
virtual_nodes_per_node: usize,
replication_enabled: bool,
min_nodes_for_cluster: usize,
node_id: String
) -> Self {
// Создаем менеджер с начальными настройками
let manager = Self {
nodes: Arc::new(DashMap::new()),
collections: Arc::new(DashMap::new()),
virtual_nodes_per_node,
min_nodes_for_cluster,
current_term: Arc::new(AtomicU64::new(0)),
voted_for: Arc::new(DashMap::new()),
is_leader: Arc::new(AtomicBool::new(false)),
raft_state: Arc::new(AtomicRaftState::new()),
cluster_formed: Arc::new(AtomicBool::new(false)),
replication_queue: Arc::new(LockFreeReplicationQueue::new()),
sequence_number: Arc::new(AtomicU64::new(0)),
replication_enabled: Arc::new(AtomicBool::new(replication_enabled)),
node_id,
};
// Добавляем текущий узел в кластер
let _ = manager.add_node(
manager.node_id.clone(),
"127.0.0.1:8081".to_string(),
1024 * 1024 * 1024
);
// Запускаем фоновый цикл репликации
let manager_clone = manager.clone();
tokio::spawn(async move {
manager_clone.run_replication_loop().await;
});
manager
}
async fn run_replication_loop(self) {
let mut heartbeat_interval = interval(Duration::from_millis(1000));
let mut election_timeout = interval(Duration::from_millis(5000));
loop {
tokio::select! {
_ = heartbeat_interval.tick() => {
if self.is_leader.load(Ordering::SeqCst) &&
self.replication_enabled.load(Ordering::SeqCst) &&
self.cluster_formed.load(Ordering::SeqCst) {
let _ = self.send_heartbeat().await;
}
}
_ = election_timeout.tick() => {
if !self.is_leader.load(Ordering::SeqCst) &&
self.replication_enabled.load(Ordering::SeqCst) &&
self.cluster_formed.load(Ordering::SeqCst) {
let _ = self.start_election();
}
}
_ = tokio::time::sleep(Duration::from_millis(10)) => {
while let Some(event) = self.replication_queue.pop() {
self.handle_replication_event(event).await;
}
}
}
}
}
async fn handle_replication_event(&self, event: ReplicationEvent) {
if !self.replication_enabled.load(Ordering::SeqCst) {
return;
}
match event {
ReplicationEvent::Command(cmd) => {
self.replicate_command(cmd).await;
}
ReplicationEvent::SyncRequest => {
self.sync_with_nodes().await;
}
ReplicationEvent::Heartbeat => {
let _ = self.send_heartbeat().await;
}
ReplicationEvent::RaftVoteRequest { term, candidate_id } => {
self.handle_vote_request(term, candidate_id).await;
}
ReplicationEvent::RaftVoteResponse { term, vote_granted } => {
self.handle_vote_response(term, vote_granted).await;
}
ReplicationEvent::RaftAppendEntries { term, leader_id } => {
self.handle_append_entries(term, leader_id).await;
}
}
}
async fn replicate_command(&self, command: protocol::Command) {
if !self.cluster_formed.load(Ordering::SeqCst) {
return;
}
let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst);
let nodes: Vec<ShardNode> = self.nodes.iter()
.map(|entry| entry.value().clone())
.collect();
for node in nodes {
if self.is_leader.load(Ordering::SeqCst) && node.raft_info.node_id == self.node_id {
continue;
}
let node_addr = node.address.clone();
let cmd_clone = command.clone();
let seq_clone = sequence;
tokio::spawn(async move {
if let Err(e) = Self::send_command_to_node(&node_addr, &cmd_clone, seq_clone).await {
eprintln!("Failed to replicate to {}: {}", node_addr, e);
}
});
}
}
async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> {
let mut stream = match tokio::net::TcpStream::connect(node).await {
Ok(stream) => stream,
Err(e) => {
eprintln!("Failed to connect to {}: {}", node, e);
return Ok(());
}
};
let message = protocol::ReplicationMessage {
sequence,
command: command.clone(),
timestamp: chrono::Utc::now().timestamp(),
};
let bytes = protocol::serialize(&message)?;
if let Err(e) = stream.write_all(&bytes).await {
eprintln!("Failed to send command to {}: {}", node, e);
}
Ok(())
}
async fn sync_with_nodes(&self) {
if !self.cluster_formed.load(Ordering::SeqCst) {
return;
}
let node_count = self.nodes.len();
println!("Starting sync with {} nodes", node_count);
let nodes: Vec<String> = self.nodes.iter()
.map(|entry| entry.value().address.clone())
.collect();
for node_addr in nodes {
tokio::spawn(async move {
if let Err(e) = Self::sync_with_node(&node_addr).await {
eprintln!("Failed to sync with {}: {}", node_addr, e);
}
});
}
}
async fn sync_with_node(_node: &str) -> Result<()> {
// Заглушка для синхронизации
Ok(())
}
async fn send_heartbeat(&self) -> Result<()> {
if !self.cluster_formed.load(Ordering::SeqCst) {
return Ok(());
}
let nodes: Vec<ShardNode> = self.nodes.iter()
.map(|entry| entry.value().clone())
.collect();
for node in nodes {
if node.raft_info.state == RaftState::Follower && node.raft_info.node_id != self.node_id {
let node_addr = node.address.clone();
tokio::spawn(async move {
if let Err(e) = Self::send_heartbeat_to_node(&node_addr).await {
eprintln!("Heartbeat failed for {}: {}", node_addr, e);
}
});
}
}
Ok(())
}
async fn send_heartbeat_to_node(node: &str) -> Result<()> {
let mut stream = match tokio::net::TcpStream::connect(node).await {
Ok(stream) => stream,
Err(e) => {
eprintln!("Failed to connect to {} for heartbeat: {}", node, e);
return Ok(());
}
};
let heartbeat = protocol::ReplicationMessage {
sequence: 0,
command: protocol::Command::CallProcedure { name: "heartbeat".to_string() },
timestamp: chrono::Utc::now().timestamp(),
};
let bytes = protocol::serialize(&heartbeat)?;
if let Err(e) = stream.write_all(&bytes).await {
eprintln!("Failed to send heartbeat to {}: {}", node, e);
}
Ok(())
}
async fn handle_vote_request(&self, term: u64, candidate_id: String) {
let current_term = self.current_term.load(Ordering::SeqCst);
if term > current_term {
self.current_term.store(term, Ordering::SeqCst);
self.voted_for.insert(term, candidate_id);
}
}
async fn handle_vote_response(&self, term: u64, vote_granted: bool) {
if vote_granted && term == self.current_term.load(Ordering::SeqCst) {
let node_count = self.nodes.len();
if node_count >= self.min_nodes_for_cluster {
match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Leader, Ordering::SeqCst) {
Ok(_) => {
self.is_leader.store(true, Ordering::SeqCst);
println!("Elected as leader for term {}", term);
}
Err(_) => {}
}
}
}
}
async fn handle_append_entries(&self, term: u64, leader_id: String) {
let current_term = self.current_term.load(Ordering::SeqCst);
if term >= current_term {
self.current_term.store(term, Ordering::SeqCst);
match self.raft_state.compare_exchange(RaftState::Candidate, RaftState::Follower, Ordering::SeqCst) {
Ok(_) => {
self.is_leader.store(false, Ordering::SeqCst);
if let Some(mut node) = self.nodes.get_mut(&self.node_id) {
node.raft_info.state = RaftState::Follower;
node.raft_info.term = term;
node.raft_info.last_heartbeat = chrono::Utc::now().timestamp();
}
}
Err(_) => {}
}
}
}
pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> {
let raft_node = RaftNode {
node_id: node_id.clone(),
address: address.clone(),
state: RaftState::Follower,
term: 0,
voted_for: None,
last_heartbeat: chrono::Utc::now().timestamp(),
};
let node = ShardNode {
node_id: node_id.clone(),
address,
capacity,
used: 0,
collections: Vec::new(),
raft_info: raft_node,
};
self.nodes.insert(node_id, node);
let node_count = self.nodes.len();
if node_count >= self.min_nodes_for_cluster {
self.cluster_formed.store(true, Ordering::SeqCst);
println!("Cluster formed with {} nodes (minimum required: {})",
node_count, self.min_nodes_for_cluster);
}
Ok(())
}
pub fn remove_node(&self, node_id: &str) -> Result<()> {
self.nodes.remove(node_id);
let node_count = self.nodes.len();
if node_count < self.min_nodes_for_cluster {
self.cluster_formed.store(false, Ordering::SeqCst);
self.is_leader.store(false, Ordering::SeqCst);
println!("Cluster no longer formed. Have {} nodes (need {})",
node_count, self.min_nodes_for_cluster);
}
Ok(())
}
pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> {
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::ShardingError(
format!("Cannot setup sharding: cluster not formed. Need at least {} nodes.",
self.min_nodes_for_cluster)
));
}
let sharding = CollectionSharding {
shard_key: shard_key.to_string(),
virtual_nodes: self.virtual_nodes_per_node,
ring: Arc::new(DashMap::new()),
};
self.collections.insert(collection.to_string(), sharding);
self.rebuild_ring(collection)?;
Ok(())
}
fn rebuild_ring(&self, collection: &str) -> Result<()> {
if let Some(mut entry) = self.collections.get_mut(collection) {
let sharding = entry.value_mut();
// Очищаем ring
sharding.ring.clear();
let nodes: Vec<String> = self.nodes.iter()
.map(|node_entry| node_entry.key().clone())
.collect();
for node_id in nodes {
for i in 0..sharding.virtual_nodes {
let key = format!("{}-{}", node_id, i);
let hash = self.hash_key(&key);
sharding.ring.insert(hash, node_id.clone());
}
}
}
Ok(())
}
fn hash_key(&self, key: &str) -> u64 {
let mut hasher = SipHasher13::new();
key.hash(&mut hasher);
hasher.finish()
}
pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result<Option<String>> {
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::ShardingError(
format!("Cannot find node: cluster not formed. Need at least {} nodes.",
self.min_nodes_for_cluster)
));
}
if let Some(sharding) = self.collections.get(collection) {
let key_hash = self.hash_key(key_value);
// Ищем ближайший узел в ConcurrentHashMap
// Собираем все записи в вектор
let mut entries: Vec<(u64, String)> = sharding.ring.iter()
.map(|entry| (*entry.key(), entry.value().clone()))
.collect();
// Сортируем по хэшу
entries.sort_by_key(|&(hash, _)| hash);
// Находим первый узел с хэшем >= key_hash
for (hash, node_id) in &entries {
if *hash >= key_hash {
return Ok(Some(node_id.clone()));
}
}
// Если не нашли, возвращаем первый узел
// Используем итерацию по срезу, чтобы не перемещать вектор
if let Some((_, node_id)) = entries.first() {
return Ok(Some(node_id.clone()));
}
}
Ok(None)
}
pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> {
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::ShardingError(
format!("Cannot migrate shard: cluster not formed. Need at least {} nodes.",
self.min_nodes_for_cluster)
));
}
if !self.nodes.contains_key(from_node) {
return Err(crate::common::FutriixError::ShardingError(
format!("Source node '{}' not found in cluster", from_node)
));
}
if !self.nodes.contains_key(to_node) {
return Err(crate::common::FutriixError::ShardingError(
format!("Destination node '{}' not found in cluster", to_node)
));
}
println!("Migrating shard for collection '{}' from {} to {} with key {}",
collection, from_node, to_node, shard_key);
self.rebuild_ring(collection)?;
Ok(())
}
pub fn rebalance_cluster(&self) -> Result<()> {
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::ShardingError(
format!("Cannot rebalance cluster: cluster not formed. Need at least {} nodes.",
self.min_nodes_for_cluster)
));
}
let node_count = self.nodes.len();
println!("Rebalancing cluster with {} nodes", node_count);
// Перестраиваем все rings
for key in self.collections.iter().map(|entry| entry.key().clone()).collect::<Vec<_>>() {
self.rebuild_ring(&key)?;
}
self.rebalance_nodes()?;
Ok(())
}
fn rebalance_nodes(&self) -> Result<()> {
println!("Rebalancing nodes in cluster...");
let mut total_capacity = 0;
let mut total_used = 0;
let mut nodes_info = Vec::new();
for node in self.nodes.iter() {
total_capacity += node.capacity;
total_used += node.used;
nodes_info.push((node.node_id.clone(), node.used, node.capacity));
}
let avg_usage = if total_capacity > 0 { total_used as f64 / total_capacity as f64 } else { 0.0 };
println!("Cluster usage: {:.2}% ({} / {})", avg_usage * 100.0, total_used, total_capacity);
let mut overloaded_nodes = Vec::new();
let mut underloaded_nodes = Vec::new();
for (node_id, used, capacity) in nodes_info {
let usage = if capacity > 0 { used as f64 / capacity as f64 } else { 0.0 };
if usage > avg_usage * 1.2 {
overloaded_nodes.push((node_id, usage));
} else if usage < avg_usage * 0.8 {
underloaded_nodes.push((node_id, usage));
}
}
println!("Overloaded nodes: {}", overloaded_nodes.len());
println!("Underloaded nodes: {}", underloaded_nodes.len());
Ok(())
}
pub fn get_cluster_status(&self) -> Result<protocol::ClusterStatus> {
let mut cluster_nodes = Vec::new();
let mut total_capacity = 0;
let mut total_used = 0;
let mut raft_nodes = Vec::new();
for node in self.nodes.iter() {
total_capacity += node.capacity;
total_used += node.used;
cluster_nodes.push(protocol::ShardInfo {
node_id: node.node_id.clone(),
address: node.address.clone(),
capacity: node.capacity,
used: node.used,
collections: node.collections.clone(),
});
raft_nodes.push(protocol::RaftNodeInfo {
node_id: node.node_id.clone(),
address: node.address.clone(),
state: match node.raft_info.state {
RaftState::Leader => "leader".to_string(),
RaftState::Follower => "follower".to_string(),
RaftState::Candidate => "candidate".to_string(),
},
term: node.raft_info.term,
last_heartbeat: node.raft_info.last_heartbeat,
});
}
let rebalance_needed = {
if total_capacity == 0 {
false
} else {
let avg_usage = total_used as f64 / total_capacity as f64;
let mut needs_rebalance = false;
for node in self.nodes.iter() {
let usage = if node.capacity > 0 {
node.used as f64 / node.capacity as f64
} else {
0.0
};
if usage > avg_usage * 1.2 || usage < avg_usage * 0.8 {
needs_rebalance = true;
break;
}
}
needs_rebalance
}
};
Ok(protocol::ClusterStatus {
nodes: cluster_nodes,
total_capacity,
total_used,
rebalance_needed,
cluster_formed: self.cluster_formed.load(Ordering::SeqCst),
leader_exists: self.is_leader.load(Ordering::SeqCst),
raft_nodes,
})
}
pub fn get_raft_nodes(&self) -> Vec<RaftNode> {
self.nodes.iter()
.map(|node| node.raft_info.clone())
.collect()
}
pub fn is_cluster_formed(&self) -> bool {
self.cluster_formed.load(Ordering::SeqCst)
}
pub fn start_election(&self) -> Result<()> {
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::ShardingError(
format!("Cluster not formed. Need at least {} nodes.", self.min_nodes_for_cluster)
));
}
let new_term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1;
println!("Starting election for term {}", new_term);
self.is_leader.store(false, Ordering::SeqCst);
match self.raft_state.compare_exchange(RaftState::Follower, RaftState::Candidate, Ordering::SeqCst) {
Ok(_) => {
if let Some(mut node) = self.nodes.get_mut(&self.node_id) {
node.raft_info.state = RaftState::Candidate;
node.raft_info.term = new_term;
node.raft_info.voted_for = Some(self.node_id.clone());
}
self.replication_queue.push(ReplicationEvent::RaftVoteRequest {
term: new_term,
candidate_id: self.node_id.clone(),
});
}
Err(current_state) => {
println!("Already in state {:?}, cannot start election", current_state);
}
}
Ok(())
}
pub async fn replicate(&self, command: protocol::Command) -> Result<()> {
if !self.replication_enabled.load(Ordering::SeqCst) {
return Ok(());
}
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::ShardingError(
"Cannot replicate: cluster not formed".to_string()
));
}
self.replication_queue.push(ReplicationEvent::Command(command));
Ok(())
}
pub async fn request_sync(&self) -> Result<()> {
if !self.replication_enabled.load(Ordering::SeqCst) {
return Ok(());
}
self.replication_queue.push(ReplicationEvent::SyncRequest);
Ok(())
}
pub fn get_nodes(&self) -> Vec<ShardNode> {
self.nodes.iter()
.map(|node| node.clone())
.collect()
}
pub fn get_sequence_number(&self) -> u64 {
self.sequence_number.load(Ordering::SeqCst)
}
pub fn is_replication_enabled(&self) -> bool {
self.replication_enabled.load(Ordering::SeqCst)
}
pub fn get_node(&self, node_id: &str) -> Option<ShardNode> {
self.nodes.get(node_id).map(|entry| entry.clone())
}
pub fn get_node_id(&self) -> &str {
&self.node_id
}
}