Upload files to "src/server"
This commit is contained in:
parent
75669a74cd
commit
bec9c7fdac
912
src/server/database.rs
Normal file
912
src/server/database.rs
Normal file
@ -0,0 +1,912 @@
|
||||
// src/server/database.rs
|
||||
//! Lock-Free документо-ориентированная база данных Futriix
|
||||
//!
|
||||
//! Реализует lock-free доступ к данным с использованием атомарных
|
||||
//! ссылок и lock-free структур данных для максимальной производительности.
|
||||
//! Автоматически добавляет временные метки ко всем операциям с документами.
|
||||
|
||||
#![allow(unused_imports)]
|
||||
#![allow(dead_code)]
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU64, AtomicBool, AtomicUsize, Ordering};
|
||||
use std::collections::HashMap;
|
||||
use serde::{Serialize, Deserialize};
|
||||
use serde_json::Value;
|
||||
use uuid::Uuid;
|
||||
use dashmap::DashMap;
|
||||
use crossbeam::epoch::{self, Atomic, Owned, Guard};
|
||||
use crossbeam::queue::SegQueue;
|
||||
|
||||
use crate::common::Result;
|
||||
use crate::common::protocol;
|
||||
|
||||
/// Триггеры для коллекций
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub enum TriggerEvent {
|
||||
BeforeCreate,
|
||||
AfterCreate,
|
||||
BeforeUpdate,
|
||||
AfterUpdate,
|
||||
BeforeDelete,
|
||||
AfterDelete,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Trigger {
|
||||
pub name: String,
|
||||
pub event: TriggerEvent,
|
||||
pub collection: String,
|
||||
pub lua_code: String,
|
||||
}
|
||||
|
||||
/// Типы индексов
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum IndexType {
|
||||
Primary,
|
||||
Secondary,
|
||||
}
|
||||
|
||||
/// Структура индекса
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Index {
|
||||
pub name: String,
|
||||
pub index_type: IndexType,
|
||||
pub field: String,
|
||||
pub unique: bool,
|
||||
}
|
||||
|
||||
/// Lock-Free хранение документов с использованием атомарных ссылок
|
||||
struct LockFreeDocumentStore {
|
||||
documents: Atomic<HashMap<String, Vec<u8>>>,
|
||||
sequence: AtomicU64,
|
||||
}
|
||||
|
||||
impl LockFreeDocumentStore {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
documents: Atomic::new(HashMap::new()),
|
||||
sequence: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
/// Lock-free вставка документа
|
||||
fn insert(&self, id: String, document: Vec<u8>, guard: &Guard) -> Result<()> {
|
||||
let current = self.documents.load(Ordering::Acquire, guard);
|
||||
let mut new_map = HashMap::new();
|
||||
|
||||
// Копируем существующие документы
|
||||
if let Some(ref map) = unsafe { current.as_ref() } {
|
||||
new_map.extend(map.iter().map(|(k, v)| (k.clone(), v.clone())));
|
||||
}
|
||||
|
||||
// Проверяем, нет ли уже такого ID
|
||||
if new_map.contains_key(&id) {
|
||||
return Err(crate::common::FutriixError::DatabaseError(
|
||||
format!("Document with ID {} already exists", id)
|
||||
));
|
||||
}
|
||||
|
||||
// Добавляем новый документ
|
||||
new_map.insert(id, document);
|
||||
|
||||
// Atomically заменяем старую карту на новую
|
||||
let new_ptr = Owned::new(new_map);
|
||||
self.documents.store(new_ptr, Ordering::Release);
|
||||
|
||||
// Увеличиваем последовательность
|
||||
self.sequence.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Lock-free чтение документа
|
||||
fn get(&self, id: &str, guard: &Guard) -> Option<Vec<u8>> {
|
||||
let current = self.documents.load(Ordering::Acquire, guard);
|
||||
if let Some(map) = unsafe { current.as_ref() } {
|
||||
map.get(id).cloned()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Lock-free обновление документа
|
||||
fn update(&self, id: &str, document: Vec<u8>, guard: &Guard) -> Result<()> {
|
||||
let current = self.documents.load(Ordering::Acquire, guard);
|
||||
|
||||
if let Some(map) = unsafe { current.as_ref() } {
|
||||
if !map.contains_key(id) {
|
||||
return Err(crate::common::FutriixError::DatabaseError(
|
||||
format!("Document not found: {}", id)
|
||||
));
|
||||
}
|
||||
|
||||
let mut new_map = HashMap::new();
|
||||
new_map.extend(map.iter().map(|(k, v)| (k.clone(), v.clone())));
|
||||
new_map.insert(id.to_string(), document);
|
||||
|
||||
let new_ptr = Owned::new(new_map);
|
||||
self.documents.store(new_ptr, Ordering::Release);
|
||||
|
||||
self.sequence.fetch_add(1, Ordering::SeqCst);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(crate::common::FutriixError::DatabaseError(
|
||||
format!("Document not found: {}", id)
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Lock-free удаление документа
|
||||
fn remove(&self, id: &str, guard: &Guard) -> Result<()> {
|
||||
let current = self.documents.load(Ordering::Acquire, guard);
|
||||
|
||||
if let Some(map) = unsafe { current.as_ref() } {
|
||||
if !map.contains_key(id) {
|
||||
return Err(crate::common::FutriixError::DatabaseError(
|
||||
format!("Document not found: {}", id)
|
||||
));
|
||||
}
|
||||
|
||||
let mut new_map = HashMap::new();
|
||||
new_map.extend(map.iter()
|
||||
.filter(|(k, _)| *k != id)
|
||||
.map(|(k, v)| (k.clone(), v.clone())));
|
||||
|
||||
let new_ptr = Owned::new(new_map);
|
||||
self.documents.store(new_ptr, Ordering::Release);
|
||||
|
||||
self.sequence.fetch_add(1, Ordering::SeqCst);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(crate::common::FutriixError::DatabaseError(
|
||||
format!("Document not found: {}", id)
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Lock-free получение всех документов
|
||||
fn get_all(&self, guard: &Guard) -> Vec<Vec<u8>> {
|
||||
let current = self.documents.load(Ordering::Acquire, guard);
|
||||
if let Some(map) = unsafe { current.as_ref() } {
|
||||
map.values().cloned().collect()
|
||||
} else {
|
||||
Vec::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Получение количества документов
|
||||
fn len(&self, guard: &Guard) -> usize {
|
||||
let current = self.documents.load(Ordering::Acquire, guard);
|
||||
if let Some(map) = unsafe { current.as_ref() } {
|
||||
map.len()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Lock-Free коллекция документов
|
||||
#[derive(Clone)]
|
||||
pub struct Collection {
|
||||
name: String,
|
||||
document_store: Arc<LockFreeDocumentStore>,
|
||||
triggers: Arc<SegQueue<Trigger>>, // Lock-free очередь триггеров
|
||||
indexes: Arc<DashMap<String, Index>>, // Lock-free хранение индексов
|
||||
index_data: Arc<DashMap<String, DashMap<String, Vec<String>>>>, // Lock-free данные индексов
|
||||
}
|
||||
|
||||
impl Collection {
|
||||
/// Создание новой lock-free коллекции
|
||||
pub fn new(name: String) -> Self {
|
||||
Self {
|
||||
name,
|
||||
document_store: Arc::new(LockFreeDocumentStore::new()),
|
||||
triggers: Arc::new(SegQueue::new()),
|
||||
indexes: Arc::new(DashMap::new()),
|
||||
index_data: Arc::new(DashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Функция для логирования операций с временной меткой
|
||||
fn log_operation(&self, operation: &str, id: &str) {
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::Write;
|
||||
|
||||
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string();
|
||||
let log_message = format!("[{}] Collection: '{}', Operation: '{}', Document ID: '{}'\n",
|
||||
timestamp, self.name, operation, id);
|
||||
|
||||
if let Ok(mut file) = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open("futriix.log")
|
||||
{
|
||||
let _ = file.write_all(log_message.as_bytes());
|
||||
}
|
||||
|
||||
println!("{}", log_message.trim());
|
||||
}
|
||||
|
||||
/// Добавление временной метки к документу
|
||||
fn add_timestamp_to_document(&self, document: Vec<u8>, operation: &str) -> Result<Vec<u8>> {
|
||||
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string();
|
||||
|
||||
let mut doc_value: Value = serde_json::from_slice(&document)
|
||||
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
|
||||
|
||||
if let Value::Object(ref mut obj) = doc_value {
|
||||
obj.insert("_timestamp".to_string(), Value::String(timestamp.clone()));
|
||||
obj.insert("_operation".to_string(), Value::String(operation.to_string()));
|
||||
}
|
||||
|
||||
serde_json::to_vec(&doc_value)
|
||||
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))
|
||||
}
|
||||
|
||||
/// Добавление триггера
|
||||
pub fn add_trigger(&self, trigger: Trigger) -> Result<()> {
|
||||
self.triggers.push(trigger);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Создание индекса
|
||||
pub fn create_index(&self, index: Index) -> Result<()> {
|
||||
if self.indexes.contains_key(&index.name) {
|
||||
return Err(crate::common::FutriixError::DatabaseError(
|
||||
format!("Index already exists: {}", index.name)
|
||||
));
|
||||
}
|
||||
|
||||
// Создаем lock-free структуру для хранения данных индекса
|
||||
self.index_data.insert(index.name.clone(), DashMap::new());
|
||||
|
||||
let index_clone = index.clone();
|
||||
self.indexes.insert(index.name.clone(), index);
|
||||
|
||||
// Перестраиваем индекс для существующих документов
|
||||
self.rebuild_index(&index_clone.name)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Перестроение индекса
|
||||
fn rebuild_index(&self, index_name: &str) -> Result<()> {
|
||||
let guard = epoch::pin();
|
||||
|
||||
let index = self.indexes.get(index_name)
|
||||
.ok_or_else(|| crate::common::FutriixError::DatabaseError(
|
||||
format!("Index not found: {}", index_name)
|
||||
))?;
|
||||
|
||||
let all_documents = self.document_store.get_all(&guard);
|
||||
let mut index_map = DashMap::new();
|
||||
|
||||
for document_bytes in all_documents {
|
||||
if let Ok(document) = serde_json::from_slice::<Value>(&document_bytes) {
|
||||
if let Some(field_value) = document.get(&index.field) {
|
||||
// Получаем ID документа из самого документа
|
||||
let id = document.get("_id")
|
||||
.or_else(|| document.get("id"))
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
|
||||
if !id.is_empty() {
|
||||
let value_str = field_value.to_string();
|
||||
let mut entry = index_map.entry(value_str).or_insert_with(Vec::new);
|
||||
entry.push(id);
|
||||
|
||||
// Проверка уникальности для уникальных индексов
|
||||
if index.unique && entry.len() > 1 {
|
||||
return Err(crate::common::FutriixError::DatabaseError(
|
||||
format!("Duplicate value {} for unique index {}", field_value, index_name)
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.index_data.insert(index_name.to_string(), index_map);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Обновление индекса при изменении документа
|
||||
fn update_indexes(&self, old_document: Option<&[u8]>, new_document: &[u8], document_id: &str) -> Result<()> {
|
||||
let new_doc_value: Value = serde_json::from_slice(new_document)
|
||||
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
|
||||
|
||||
let old_doc_value: Option<Value> = old_document
|
||||
.and_then(|doc| serde_json::from_slice(doc).ok());
|
||||
|
||||
for index_entry in self.indexes.iter() {
|
||||
let index_name = index_entry.key();
|
||||
let index = index_entry.value();
|
||||
|
||||
if let Some(index_map) = self.index_data.get_mut(index_name) {
|
||||
// Удаляем старые значения из индекса
|
||||
if let Some(old_doc) = &old_doc_value {
|
||||
if let Some(old_value) = old_doc.get(&index.field) {
|
||||
let old_value_str = old_value.to_string();
|
||||
if let Some(mut entries) = index_map.get_mut(&old_value_str) {
|
||||
entries.retain(|id| id != document_id);
|
||||
if entries.is_empty() {
|
||||
index_map.remove(&old_value_str);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Добавляем новые значения в индекс
|
||||
if let Some(new_value) = new_doc_value.get(&index.field) {
|
||||
let new_value_str = new_value.to_string();
|
||||
let mut entries = index_map.entry(new_value_str).or_insert_with(Vec::new);
|
||||
|
||||
// Проверка уникальности
|
||||
if index.unique && !entries.is_empty() && entries[0] != document_id {
|
||||
return Err(crate::common::FutriixError::DatabaseError(
|
||||
format!("Duplicate value {} for unique index {}", new_value, index_name)
|
||||
));
|
||||
}
|
||||
|
||||
if !entries.contains(&document_id.to_string()) {
|
||||
entries.push(document_id.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Поиск по индексу
|
||||
pub fn query_by_index(&self, index_name: &str, value: &Value) -> Result<Vec<String>> {
|
||||
let index_map = self.index_data.get(index_name)
|
||||
.ok_or_else(|| crate::common::FutriixError::DatabaseError(
|
||||
format!("Index not found: {}", index_name)
|
||||
))?;
|
||||
|
||||
let value_str = value.to_string();
|
||||
Ok(index_map.get(&value_str)
|
||||
.map(|entry| entry.value().clone())
|
||||
.unwrap_or_default())
|
||||
}
|
||||
|
||||
/// Lock-free создание документа с временной меткой
|
||||
pub fn create_document(&self, document: Vec<u8>) -> Result<String> {
|
||||
let guard = epoch::pin();
|
||||
|
||||
let id = Uuid::new_v4().to_string();
|
||||
|
||||
// Добавляем ID в документ
|
||||
let mut doc_value: Value = serde_json::from_slice(&document)
|
||||
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
|
||||
|
||||
if let Value::Object(ref mut obj) = doc_value {
|
||||
obj.insert("_id".to_string(), Value::String(id.clone()));
|
||||
}
|
||||
|
||||
let document_with_id = serde_json::to_vec(&doc_value)
|
||||
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
|
||||
|
||||
// Добавляем временную метку к документу
|
||||
let document_with_timestamp = self.add_timestamp_to_document(document_with_id, "create")?;
|
||||
|
||||
// Сначала проверяем индексы
|
||||
self.update_indexes(None, &document_with_timestamp, &id)?;
|
||||
|
||||
// Затем вставляем документ
|
||||
self.document_store.insert(id.clone(), document_with_timestamp, &guard)?;
|
||||
|
||||
// Логируем операцию
|
||||
self.log_operation("create", &id);
|
||||
|
||||
println!("Document created in collection '{}' with ID: {}", self.name, id);
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
/// Lock-free чтение документа
|
||||
pub fn read_document(&self, id: &str) -> Result<Option<Vec<u8>>> {
|
||||
let guard = epoch::pin();
|
||||
|
||||
// Логируем операцию чтения
|
||||
self.log_operation("read", id);
|
||||
|
||||
Ok(self.document_store.get(id, &guard))
|
||||
}
|
||||
|
||||
/// Lock-free обновление документа с временной меткой
|
||||
pub fn update_document(&self, id: &str, document: Vec<u8>) -> Result<()> {
|
||||
let guard = epoch::pin();
|
||||
|
||||
// Сначала читаем старый документ
|
||||
let old_document = self.document_store.get(id, &guard)
|
||||
.ok_or_else(|| crate::common::FutriixError::DatabaseError(
|
||||
format!("Document not found: {}", id)
|
||||
))?;
|
||||
|
||||
// Добавляем временную метку к новому документу
|
||||
let document_with_timestamp = self.add_timestamp_to_document(document, "update")?;
|
||||
|
||||
// Обновляем индексы
|
||||
self.update_indexes(Some(&old_document), &document_with_timestamp, id)?;
|
||||
|
||||
// Обновляем документ
|
||||
self.document_store.update(id, document_with_timestamp, &guard)?;
|
||||
|
||||
// Логируем операцию
|
||||
self.log_operation("update", id);
|
||||
|
||||
println!("Document updated in collection '{}': {}", self.name, id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Lock-free удаление документа с временной меткой
|
||||
pub fn delete_document(&self, id: &str) -> Result<()> {
|
||||
let guard = epoch::pin();
|
||||
|
||||
// Сначала читаем старый документ
|
||||
let old_document = self.document_store.get(id, &guard)
|
||||
.ok_or_else(|| crate::common::FutriixError::DatabaseError(
|
||||
format!("Document not found: {}", id)
|
||||
))?;
|
||||
|
||||
// Удаляем из индексов
|
||||
self.update_indexes(Some(&old_document), &[], id)?;
|
||||
|
||||
// Удаляем документ
|
||||
self.document_store.remove(id, &guard)?;
|
||||
|
||||
// Логируем операцию
|
||||
self.log_operation("delete", id);
|
||||
|
||||
println!("Document deleted from collection '{}': {}", self.name, id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Lock-free запрос документов
|
||||
pub fn query_documents(&self, filter: Vec<u8>) -> Result<Vec<Vec<u8>>> {
|
||||
let guard = epoch::pin();
|
||||
|
||||
// Логируем операцию запроса
|
||||
self.log_operation("query", "multiple");
|
||||
|
||||
let documents = self.document_store.get_all(&guard);
|
||||
|
||||
// Если есть фильтр, применяем его
|
||||
if !filter.is_empty() {
|
||||
if let Ok(filter_value) = serde_json::from_slice::<Value>(&filter) {
|
||||
return self.filter_documents(documents, &filter_value);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(documents)
|
||||
}
|
||||
|
||||
/// Фильтрация документов по JSON фильтру
|
||||
fn filter_documents(&self, documents: Vec<Vec<u8>>, filter: &Value) -> Result<Vec<Vec<u8>>> {
|
||||
let mut filtered = Vec::new();
|
||||
|
||||
for document in documents {
|
||||
if let Ok(doc_value) = serde_json::from_slice::<Value>(&document) {
|
||||
// Простая фильтрация - проверяем совпадение полей
|
||||
if Self::document_matches_filter(&doc_value, filter) {
|
||||
filtered.push(document);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(filtered)
|
||||
}
|
||||
|
||||
/// Проверка соответствия документа фильтру
|
||||
fn document_matches_filter(document: &Value, filter: &Value) -> bool {
|
||||
match filter {
|
||||
Value::Object(filter_obj) => {
|
||||
if let Value::Object(doc_obj) = document {
|
||||
for (key, filter_value) in filter_obj {
|
||||
match doc_obj.get(key) {
|
||||
Some(doc_value) => {
|
||||
if doc_value != filter_value {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
None => return false,
|
||||
}
|
||||
}
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
_ => true, // Если фильтр не объект, пропускаем все документы
|
||||
}
|
||||
}
|
||||
|
||||
/// Получение имени коллекции (lock-free)
|
||||
#[allow(dead_code)]
|
||||
pub fn get_name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
/// Получение количества документов (lock-free)
|
||||
#[allow(dead_code)]
|
||||
pub fn count_documents(&self) -> Result<usize> {
|
||||
let guard = epoch::pin();
|
||||
Ok(self.document_store.len(&guard))
|
||||
}
|
||||
}
|
||||
|
||||
/// Lock-Free транзакция
|
||||
pub struct LockFreeTransaction {
|
||||
commands: SegQueue<protocol::Command>,
|
||||
active: AtomicBool,
|
||||
}
|
||||
|
||||
impl LockFreeTransaction {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
commands: SegQueue::new(),
|
||||
active: AtomicBool::new(true),
|
||||
}
|
||||
}
|
||||
|
||||
fn add_command(&self, command: protocol::Command) -> Result<()> {
|
||||
if !self.active.load(Ordering::Acquire) {
|
||||
return Err(crate::common::FutriixError::DatabaseError(
|
||||
"Transaction is not active".to_string()
|
||||
));
|
||||
}
|
||||
self.commands.push(command);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn commit(self, db: &Database) -> Result<Vec<protocol::Response>> {
|
||||
if !self.active.load(Ordering::Acquire) {
|
||||
return Err(crate::common::FutriixError::DatabaseError(
|
||||
"Transaction is not active".to_string()
|
||||
));
|
||||
}
|
||||
|
||||
self.active.store(false, Ordering::Release);
|
||||
|
||||
let mut results = Vec::new();
|
||||
while let Some(cmd) = self.commands.pop() {
|
||||
match db.execute_command(cmd) {
|
||||
Ok(response) => results.push(response),
|
||||
Err(e) => {
|
||||
return Err(crate::common::FutriixError::DatabaseError(
|
||||
format!("Transaction failed: {}", e)
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
fn rollback(self) -> Result<()> {
|
||||
if !self.active.load(Ordering::Acquire) {
|
||||
return Err(crate::common::FutriixError::DatabaseError(
|
||||
"Transaction is not active".to_string()
|
||||
));
|
||||
}
|
||||
|
||||
self.active.store(false, Ordering::Release);
|
||||
|
||||
// Очищаем очередь команд
|
||||
while self.commands.pop().is_some() {}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Lock-Free база данных
|
||||
#[derive(Clone)]
|
||||
pub struct Database {
|
||||
collections: Arc<DashMap<String, Collection>>, // Lock-free хранение коллекций
|
||||
procedures: Arc<DashMap<String, Vec<u8>>>, // Lock-free хранение процедур
|
||||
transactions: Arc<DashMap<String, Arc<LockFreeTransaction>>>, // Lock-free транзакции
|
||||
triggers: Arc<DashMap<String, Vec<Trigger>>>, // Хранилище триггеров
|
||||
}
|
||||
|
||||
impl Database {
|
||||
/// Создание новой lock-free базы данных
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
collections: Arc::new(DashMap::new()),
|
||||
procedures: Arc::new(DashMap::new()),
|
||||
transactions: Arc::new(DashMap::new()),
|
||||
triggers: Arc::new(DashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Lock-Free получение или создание коллекции
|
||||
pub fn get_collection(&self, name: &str) -> Collection {
|
||||
if let Some(collection) = self.collections.get(name) {
|
||||
return collection.clone();
|
||||
}
|
||||
|
||||
// Создаем новую коллекцию lock-free способом
|
||||
let new_collection = Collection::new(name.to_string());
|
||||
self.collections.insert(name.to_string(), new_collection.clone());
|
||||
|
||||
new_collection
|
||||
}
|
||||
|
||||
/// Получение коллекции (без создания)
|
||||
fn get_collection_opt(&self, name: &str) -> Option<Collection> {
|
||||
self.collections.get(name).map(|entry| entry.value().clone())
|
||||
}
|
||||
|
||||
/// Lock-Free выполнение команды
|
||||
pub fn execute_command(&self, command: protocol::Command) -> Result<protocol::Response> {
|
||||
match command {
|
||||
protocol::Command::Create { collection, document } => {
|
||||
// Проверяем триггеры BeforeCreate
|
||||
self.execute_triggers(&collection, TriggerEvent::BeforeCreate, &document)?;
|
||||
|
||||
let coll = self.get_collection(&collection);
|
||||
match coll.create_document(document.clone()) {
|
||||
Ok(id) => {
|
||||
// Выполняем триггеры AfterCreate
|
||||
self.execute_triggers(&collection, TriggerEvent::AfterCreate, &document)?;
|
||||
Ok(protocol::Response::Success(id.into_bytes()))
|
||||
}
|
||||
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||
}
|
||||
}
|
||||
protocol::Command::Read { collection, id } => {
|
||||
let coll = self.get_collection(&collection);
|
||||
match coll.read_document(&id) {
|
||||
Ok(Some(document)) => Ok(protocol::Response::Success(document)),
|
||||
Ok(None) => Ok(protocol::Response::Error(format!("Document not found: {}", id))),
|
||||
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||
}
|
||||
}
|
||||
protocol::Command::Update { collection, id, document } => {
|
||||
// Читаем старый документ для триггеров
|
||||
let old_document = {
|
||||
let coll = self.get_collection(&collection);
|
||||
coll.read_document(&id).ok().flatten()
|
||||
};
|
||||
|
||||
// Проверяем триггеры BeforeUpdate
|
||||
self.execute_triggers(&collection, TriggerEvent::BeforeUpdate, &document)?;
|
||||
|
||||
let coll = self.get_collection(&collection);
|
||||
match coll.update_document(&id, document.clone()) {
|
||||
Ok(_) => {
|
||||
// Выполняем триггеры AfterUpdate
|
||||
self.execute_triggers(&collection, TriggerEvent::AfterUpdate, &document)?;
|
||||
Ok(protocol::Response::Success(vec![]))
|
||||
}
|
||||
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||
}
|
||||
}
|
||||
protocol::Command::Delete { collection, id } => {
|
||||
// Читаем старый документ для триггеров
|
||||
let old_document = {
|
||||
let coll = self.get_collection(&collection);
|
||||
coll.read_document(&id).ok().flatten()
|
||||
};
|
||||
|
||||
// Проверяем триггеры BeforeDelete
|
||||
if let Some(doc) = &old_document {
|
||||
self.execute_triggers(&collection, TriggerEvent::BeforeDelete, doc)?;
|
||||
}
|
||||
|
||||
let coll = self.get_collection(&collection);
|
||||
match coll.delete_document(&id) {
|
||||
Ok(_) => {
|
||||
// Выполняем триггеры AfterDelete
|
||||
if let Some(doc) = &old_document {
|
||||
self.execute_triggers(&collection, TriggerEvent::AfterDelete, doc)?;
|
||||
}
|
||||
Ok(protocol::Response::Success(vec![]))
|
||||
}
|
||||
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||
}
|
||||
}
|
||||
protocol::Command::Query { collection, filter } => {
|
||||
let coll = self.get_collection(&collection);
|
||||
match coll.query_documents(filter) {
|
||||
Ok(documents) => {
|
||||
let json_docs: Vec<Value> = documents.into_iter()
|
||||
.filter_map(|doc| serde_json::from_slice(&doc).ok())
|
||||
.collect();
|
||||
|
||||
match serde_json::to_vec(&json_docs) {
|
||||
Ok(data) => Ok(protocol::Response::Success(data)),
|
||||
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||
}
|
||||
}
|
||||
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||
}
|
||||
}
|
||||
protocol::Command::CreateProcedure { name, code } => {
|
||||
self.procedures.insert(name, code);
|
||||
Ok(protocol::Response::Success(vec![]))
|
||||
}
|
||||
protocol::Command::CallProcedure { name } => {
|
||||
if let Some(code) = self.procedures.get(&name) {
|
||||
// TODO: Выполнить Lua код процедуры
|
||||
Ok(protocol::Response::Success(format!("Procedure {} executed", name).into_bytes()))
|
||||
} else {
|
||||
Ok(protocol::Response::Error(format!("Procedure not found: {}", name)))
|
||||
}
|
||||
}
|
||||
protocol::Command::BeginTransaction { transaction_id } => {
|
||||
if self.transactions.contains_key(&transaction_id) {
|
||||
return Ok(protocol::Response::Error("Transaction already exists".to_string()));
|
||||
}
|
||||
|
||||
let transaction = Arc::new(LockFreeTransaction::new());
|
||||
self.transactions.insert(transaction_id, transaction);
|
||||
Ok(protocol::Response::Success(vec![]))
|
||||
}
|
||||
protocol::Command::CommitTransaction { transaction_id } => {
|
||||
if let Some((_, transaction)) = self.transactions.remove(&transaction_id) {
|
||||
match Arc::try_unwrap(transaction) {
|
||||
Ok(transaction) => {
|
||||
match transaction.commit(self) {
|
||||
Ok(_) => Ok(protocol::Response::Success(vec![])),
|
||||
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||
}
|
||||
}
|
||||
Err(_) => Ok(protocol::Response::Error("Transaction is still referenced".to_string())),
|
||||
}
|
||||
} else {
|
||||
Ok(protocol::Response::Error("Transaction not found".to_string()))
|
||||
}
|
||||
}
|
||||
protocol::Command::RollbackTransaction { transaction_id } => {
|
||||
if let Some((_, transaction)) = self.transactions.remove(&transaction_id) {
|
||||
match Arc::try_unwrap(transaction) {
|
||||
Ok(transaction) => {
|
||||
match transaction.rollback() {
|
||||
Ok(_) => Ok(protocol::Response::Success(vec![])),
|
||||
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||
}
|
||||
}
|
||||
Err(_) => Ok(protocol::Response::Error("Transaction is still referenced".to_string())),
|
||||
}
|
||||
} else {
|
||||
Ok(protocol::Response::Error("Transaction not found".to_string()))
|
||||
}
|
||||
}
|
||||
protocol::Command::CreateIndex { collection, index } => {
|
||||
let coll = self.get_collection(&collection);
|
||||
match coll.create_index(index) {
|
||||
Ok(_) => Ok(protocol::Response::Success(vec![])),
|
||||
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||
}
|
||||
}
|
||||
protocol::Command::QueryByIndex { collection, index_name, value } => {
|
||||
let coll = self.get_collection(&collection);
|
||||
let value: Value = serde_json::from_slice(&value)
|
||||
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
|
||||
|
||||
match coll.query_by_index(&index_name, &value) {
|
||||
Ok(document_ids) => {
|
||||
let result = serde_json::to_vec(&document_ids)
|
||||
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
|
||||
Ok(protocol::Response::Success(result))
|
||||
}
|
||||
Err(e) => Ok(protocol::Response::Error(e.to_string())),
|
||||
}
|
||||
}
|
||||
// Обработка остальных команд...
|
||||
_ => {
|
||||
// TODO: Реализовать обработку остальных команд
|
||||
Ok(protocol::Response::Error("Command not implemented".to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Выполнение триггеров
|
||||
fn execute_triggers(&self, collection: &str, event: TriggerEvent, document: &[u8]) -> Result<()> {
|
||||
if let Some(triggers) = self.triggers.get(collection) {
|
||||
for trigger in triggers.value() {
|
||||
if trigger.event == event {
|
||||
// TODO: Выполнить Lua код триггера
|
||||
println!("Executing trigger '{}' for collection '{}' on {:?}",
|
||||
trigger.name, collection, event);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Добавление триггера к коллекции
|
||||
pub fn add_trigger(&self, trigger: Trigger) -> Result<()> {
|
||||
let mut triggers = self.triggers
|
||||
.entry(trigger.collection.clone())
|
||||
.or_insert_with(Vec::new);
|
||||
|
||||
// Проверяем, нет ли уже триггера с таким именем
|
||||
for existing in triggers.iter() {
|
||||
if existing.name == trigger.name {
|
||||
return Err(crate::common::FutriixError::DatabaseError(
|
||||
format!("Trigger '{}' already exists for collection '{}'",
|
||||
trigger.name, trigger.collection)
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
triggers.push(trigger);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Lock-Free получение статистики базы данных
|
||||
#[allow(dead_code)]
|
||||
pub fn get_stats(&self) -> Result<std::collections::HashMap<String, usize>> {
|
||||
let mut stats = std::collections::HashMap::new();
|
||||
stats.insert("collections".to_string(), self.collections.len());
|
||||
stats.insert("procedures".to_string(), self.procedures.len());
|
||||
stats.insert("active_transactions".to_string(), self.transactions.len());
|
||||
stats.insert("triggers".to_string(), self.triggers.len());
|
||||
|
||||
// Подсчет документов во всех коллекциях
|
||||
let total_documents: usize = self.collections.iter()
|
||||
.map(|entry| entry.value().count_documents().unwrap_or(0))
|
||||
.sum();
|
||||
|
||||
stats.insert("total_documents".to_string(), total_documents);
|
||||
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
/// Создание бэкапа базы данных
|
||||
pub fn create_backup(&self) -> Result<std::collections::HashMap<String, std::collections::HashMap<String, Vec<u8>>>> {
|
||||
let mut backup = std::collections::HashMap::new();
|
||||
|
||||
for entry in self.collections.iter() {
|
||||
let name = entry.key().clone();
|
||||
let collection = entry.value();
|
||||
|
||||
// Используем guard для lock-free доступа
|
||||
let guard = epoch::pin();
|
||||
let documents = collection.query_documents(vec![])?;
|
||||
|
||||
let mut collection_backup = std::collections::HashMap::new();
|
||||
for document in documents {
|
||||
if let Ok(doc_value) = serde_json::from_slice::<Value>(&document) {
|
||||
if let Some(id) = doc_value.get("_id").and_then(|v| v.as_str()) {
|
||||
collection_backup.insert(id.to_string(), document);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
backup.insert(name, collection_backup);
|
||||
}
|
||||
|
||||
Ok(backup)
|
||||
}
|
||||
|
||||
/// Восстановление из бэкапа
|
||||
pub fn restore_from_backup(&self, backup: std::collections::HashMap<String, std::collections::HashMap<String, Vec<u8>>>) -> Result<()> {
|
||||
// Очищаем существующие коллекции
|
||||
self.collections.clear();
|
||||
|
||||
// Восстанавливаем данные из бэкапа
|
||||
for (collection_name, documents) in backup {
|
||||
let collection = self.get_collection(&collection_name);
|
||||
|
||||
for (id, document) in documents {
|
||||
// Восстанавливаем документ
|
||||
let command = protocol::Command::Create {
|
||||
collection: collection_name.clone(),
|
||||
document,
|
||||
};
|
||||
|
||||
if let Err(e) = self.execute_command(command) {
|
||||
eprintln!("Failed to restore document {}: {}", id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
767
src/server/sharding.rs
Normal file
767
src/server/sharding.rs
Normal file
@ -0,0 +1,767 @@
|
||||
// src/server/sharding.rs
|
||||
//! Модуль шардинга с консистентным хэшированием и Raft протоколом
|
||||
//!
|
||||
//! Объединяет функционал шардинга и репликации с lock-free архитектурой
|
||||
//! и реализацией Raft консенсуса для работы в production.
|
||||
|
||||
use std::collections::{HashMap, BTreeMap};
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::{interval, Duration};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use serde::{Serialize, Deserialize};
|
||||
use siphasher::sip::SipHasher13;
|
||||
use dashmap::DashMap;
|
||||
use crossbeam::queue::SegQueue;
|
||||
|
||||
use crate::common::Result;
|
||||
use crate::common::protocol;
|
||||
|
||||
/// Состояния узла в Raft протоколе
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub enum RaftState {
|
||||
Follower,
|
||||
Candidate,
|
||||
Leader,
|
||||
}
|
||||
|
||||
/// Информация о Raft узле
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct RaftNode {
|
||||
pub node_id: String,
|
||||
pub address: String,
|
||||
pub state: RaftState,
|
||||
pub term: u64,
|
||||
pub voted_for: Option<String>,
|
||||
pub last_heartbeat: i64,
|
||||
}
|
||||
|
||||
/// Информация о шард-узле с Raft
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ShardNode {
|
||||
pub node_id: String,
|
||||
pub address: String,
|
||||
pub capacity: u64,
|
||||
pub used: u64,
|
||||
pub collections: Vec<String>,
|
||||
pub raft_info: RaftNode,
|
||||
}
|
||||
|
||||
/// Состояние шардинга для коллекции
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CollectionSharding {
|
||||
pub shard_key: String,
|
||||
pub virtual_nodes: usize,
|
||||
pub ring: BTreeMap<u64, String>, // consistent hash ring
|
||||
}
|
||||
|
||||
/// События репликации
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum ReplicationEvent {
|
||||
Command(protocol::Command),
|
||||
SyncRequest,
|
||||
Heartbeat,
|
||||
RaftVoteRequest { term: u64, candidate_id: String },
|
||||
RaftVoteResponse { term: u64, vote_granted: bool },
|
||||
RaftAppendEntries { term: u64, leader_id: String },
|
||||
}
|
||||
|
||||
/// Lock-Free очередь репликации
|
||||
struct LockFreeReplicationQueue {
|
||||
queue: SegQueue<ReplicationEvent>,
|
||||
size: AtomicUsize,
|
||||
}
|
||||
|
||||
impl LockFreeReplicationQueue {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
queue: SegQueue::new(),
|
||||
size: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
fn push(&self, event: ReplicationEvent) {
|
||||
self.queue.push(event);
|
||||
self.size.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
fn pop(&self) -> Option<ReplicationEvent> {
|
||||
let event = self.queue.pop();
|
||||
if event.is_some() {
|
||||
self.size.fetch_sub(1, Ordering::SeqCst);
|
||||
}
|
||||
event
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
self.size.load(Ordering::Acquire)
|
||||
}
|
||||
}
|
||||
|
||||
/// Менеджер шардинга и репликации с Raft
|
||||
#[derive(Clone)]
|
||||
pub struct ShardingManager {
|
||||
// Шардинг компоненты
|
||||
nodes: Arc<DashMap<String, ShardNode>>, // Lock-free хранение узлов
|
||||
collections: Arc<DashMap<String, CollectionSharding>>, // Lock-free хранение коллекций
|
||||
virtual_nodes_per_node: usize,
|
||||
min_nodes_for_cluster: usize,
|
||||
|
||||
// Raft компоненты
|
||||
current_term: Arc<AtomicU64>, // Текущий терм Raft
|
||||
voted_for: Arc<DashMap<u64, String>>, // Голоса за термы
|
||||
is_leader: Arc<AtomicBool>, // Флаг лидера
|
||||
cluster_formed: Arc<AtomicBool>, // Флаг сформированности кластера
|
||||
|
||||
// Репликация компоненты
|
||||
replication_queue: Arc<LockFreeReplicationQueue>,
|
||||
sequence_number: Arc<AtomicU64>,
|
||||
replication_enabled: Arc<AtomicBool>,
|
||||
node_id: String, // ID текущего узла
|
||||
}
|
||||
|
||||
impl ShardingManager {
|
||||
/// Создание нового менеджера шардинга и репликации
|
||||
pub fn new(
|
||||
virtual_nodes_per_node: usize,
|
||||
replication_enabled: bool,
|
||||
min_nodes_for_cluster: usize,
|
||||
node_id: String
|
||||
) -> Self {
|
||||
let manager = Self {
|
||||
nodes: Arc::new(DashMap::new()),
|
||||
collections: Arc::new(DashMap::new()),
|
||||
virtual_nodes_per_node,
|
||||
min_nodes_for_cluster,
|
||||
current_term: Arc::new(AtomicU64::new(0)),
|
||||
voted_for: Arc::new(DashMap::new()),
|
||||
is_leader: Arc::new(AtomicBool::new(false)),
|
||||
cluster_formed: Arc::new(AtomicBool::new(false)),
|
||||
replication_queue: Arc::new(LockFreeReplicationQueue::new()),
|
||||
sequence_number: Arc::new(AtomicU64::new(0)),
|
||||
replication_enabled: Arc::new(AtomicBool::new(replication_enabled)),
|
||||
node_id,
|
||||
};
|
||||
|
||||
// Добавляем текущий узел в кластер
|
||||
let _ = manager.add_node(
|
||||
manager.node_id.clone(),
|
||||
"127.0.0.1:8081".to_string(),
|
||||
1024 * 1024 * 1024
|
||||
);
|
||||
|
||||
// Запуск фоновой задачи обработки репликации и Raft
|
||||
let manager_clone = manager.clone();
|
||||
tokio::spawn(async move {
|
||||
manager_clone.run_replication_loop().await;
|
||||
});
|
||||
|
||||
manager
|
||||
}
|
||||
|
||||
/// Фоновая задача обработки репликации и Raft
|
||||
async fn run_replication_loop(self) {
|
||||
let mut heartbeat_interval = interval(Duration::from_millis(1000));
|
||||
let mut election_timeout = interval(Duration::from_millis(5000));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = heartbeat_interval.tick() => {
|
||||
// ИСПРАВЛЕНИЕ: Проверяем, что кластер сформирован перед отправкой heartbeat
|
||||
if self.is_leader.load(Ordering::SeqCst) &&
|
||||
self.replication_enabled.load(Ordering::SeqCst) &&
|
||||
self.cluster_formed.load(Ordering::SeqCst) {
|
||||
let _ = self.send_heartbeat().await;
|
||||
}
|
||||
}
|
||||
_ = election_timeout.tick() => {
|
||||
// Если мы follower и не получали heartbeat, начинаем выборы
|
||||
if !self.is_leader.load(Ordering::SeqCst) &&
|
||||
self.replication_enabled.load(Ordering::SeqCst) &&
|
||||
self.cluster_formed.load(Ordering::SeqCst) {
|
||||
let _ = self.start_election();
|
||||
}
|
||||
}
|
||||
_ = tokio::time::sleep(Duration::from_millis(10)) => {
|
||||
// Обработка событий из lock-free очереди
|
||||
while let Some(event) = self.replication_queue.pop() {
|
||||
self.handle_replication_event(event).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Обработка событий репликации
|
||||
async fn handle_replication_event(&self, event: ReplicationEvent) {
|
||||
if !self.replication_enabled.load(Ordering::SeqCst) {
|
||||
return;
|
||||
}
|
||||
|
||||
match event {
|
||||
ReplicationEvent::Command(cmd) => {
|
||||
self.replicate_command(cmd).await;
|
||||
}
|
||||
ReplicationEvent::SyncRequest => {
|
||||
self.sync_with_nodes().await;
|
||||
}
|
||||
ReplicationEvent::Heartbeat => {
|
||||
let _ = self.send_heartbeat().await;
|
||||
}
|
||||
ReplicationEvent::RaftVoteRequest { term, candidate_id } => {
|
||||
self.handle_vote_request(term, candidate_id).await;
|
||||
}
|
||||
ReplicationEvent::RaftVoteResponse { term, vote_granted } => {
|
||||
self.handle_vote_response(term, vote_granted).await;
|
||||
}
|
||||
ReplicationEvent::RaftAppendEntries { term, leader_id } => {
|
||||
self.handle_append_entries(term, leader_id).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Репликация команды на другие узлы
|
||||
async fn replicate_command(&self, command: protocol::Command) {
|
||||
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||
return;
|
||||
}
|
||||
|
||||
let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
for entry in self.nodes.iter() {
|
||||
let node = entry.value();
|
||||
// Реплицируем на все узлы, кроме текущего лидера (если мы лидер)
|
||||
if self.is_leader.load(Ordering::SeqCst) && node.raft_info.node_id == self.node_id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let node_addr = node.address.clone();
|
||||
let cmd_clone = command.clone();
|
||||
let seq_clone = sequence;
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = Self::send_command_to_node(&node_addr, &cmd_clone, seq_clone).await {
|
||||
eprintln!("Failed to replicate to {}: {}", node_addr, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Отправка команды на удаленный узел
|
||||
async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> {
|
||||
let mut stream = match tokio::net::TcpStream::connect(node).await {
|
||||
Ok(stream) => stream,
|
||||
Err(e) => {
|
||||
eprintln!("Failed to connect to {}: {}", node, e);
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let message = protocol::ReplicationMessage {
|
||||
sequence,
|
||||
command: command.clone(),
|
||||
timestamp: chrono::Utc::now().timestamp(),
|
||||
};
|
||||
|
||||
let bytes = protocol::serialize(&message)?;
|
||||
|
||||
if let Err(e) = stream.write_all(&bytes).await {
|
||||
eprintln!("Failed to send command to {}: {}", node, e);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Синхронизация с другими узлами
|
||||
async fn sync_with_nodes(&self) {
|
||||
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||
return;
|
||||
}
|
||||
|
||||
println!("Starting sync with {} nodes", self.nodes.len());
|
||||
for entry in self.nodes.iter() {
|
||||
let node_addr = entry.value().address.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = Self::sync_with_node(&node_addr).await {
|
||||
eprintln!("Failed to sync with {}: {}", node_addr, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Синхронизация с удаленным узлом
|
||||
async fn sync_with_node(_node: &str) -> Result<()> {
|
||||
// TODO: Реализовать lock-free синхронизацию данных
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Отправка heartbeat
|
||||
async fn send_heartbeat(&self) -> Result<()> {
|
||||
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
for entry in self.nodes.iter() {
|
||||
let node = entry.value();
|
||||
// Отправляем heartbeat только на follower узлы, кроме себя
|
||||
if node.raft_info.state == RaftState::Follower && node.raft_info.node_id != self.node_id {
|
||||
let node_addr = node.address.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = Self::send_heartbeat_to_node(&node_addr).await {
|
||||
eprintln!("Heartbeat failed for {}: {}", node_addr, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Отправка heartbeat на удаленный узел
|
||||
async fn send_heartbeat_to_node(node: &str) -> Result<()> {
|
||||
let mut stream = match tokio::net::TcpStream::connect(node).await {
|
||||
Ok(stream) => stream,
|
||||
Err(e) => {
|
||||
eprintln!("Failed to connect to {} for heartbeat: {}", node, e);
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let heartbeat = protocol::ReplicationMessage {
|
||||
sequence: 0,
|
||||
command: protocol::Command::CallProcedure { name: "heartbeat".to_string() },
|
||||
timestamp: chrono::Utc::now().timestamp(),
|
||||
};
|
||||
|
||||
let bytes = protocol::serialize(&heartbeat)?;
|
||||
|
||||
if let Err(e) = stream.write_all(&bytes).await {
|
||||
eprintln!("Failed to send heartbeat to {}: {}", node, e);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Raft методы
|
||||
/// Обработка запроса голоса
|
||||
async fn handle_vote_request(&self, term: u64, candidate_id: String) {
|
||||
let current_term = self.current_term.load(Ordering::SeqCst);
|
||||
|
||||
if term > current_term {
|
||||
self.current_term.store(term, Ordering::SeqCst);
|
||||
self.voted_for.insert(term, candidate_id.clone());
|
||||
// TODO: Отправить положительный ответ
|
||||
}
|
||||
// TODO: Отправить отрицательный ответ если условия не выполнены
|
||||
}
|
||||
|
||||
/// Обработка ответа голоса
|
||||
async fn handle_vote_response(&self, term: u64, vote_granted: bool) {
|
||||
if vote_granted && term == self.current_term.load(Ordering::SeqCst) {
|
||||
// TODO: Подсчитать голоса и перейти в лидеры при большинстве
|
||||
}
|
||||
}
|
||||
|
||||
/// Обработка AppendEntries RPC
|
||||
async fn handle_append_entries(&self, term: u64, leader_id: String) {
|
||||
let current_term = self.current_term.load(Ordering::SeqCst);
|
||||
|
||||
if term >= current_term {
|
||||
self.current_term.store(term, Ordering::SeqCst);
|
||||
self.is_leader.store(false, Ordering::SeqCst);
|
||||
|
||||
// Обновляем состояние узла как follower
|
||||
if let Some(mut node) = self.nodes.get_mut(&self.node_id) {
|
||||
node.raft_info.state = RaftState::Follower;
|
||||
node.raft_info.term = term;
|
||||
node.raft_info.last_heartbeat = chrono::Utc::now().timestamp();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Шардинг методы
|
||||
/// Добавление шард-узла с Raft информацией
|
||||
pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> {
|
||||
let raft_node = RaftNode {
|
||||
node_id: node_id.clone(),
|
||||
address: address.clone(),
|
||||
state: RaftState::Follower,
|
||||
term: 0,
|
||||
voted_for: None,
|
||||
last_heartbeat: chrono::Utc::now().timestamp(),
|
||||
};
|
||||
|
||||
let node = ShardNode {
|
||||
node_id: node_id.clone(),
|
||||
address,
|
||||
capacity,
|
||||
used: 0,
|
||||
collections: Vec::new(),
|
||||
raft_info: raft_node,
|
||||
};
|
||||
|
||||
self.nodes.insert(node_id, node);
|
||||
|
||||
// Проверяем сформированность кластера
|
||||
if self.nodes.len() >= self.min_nodes_for_cluster {
|
||||
self.cluster_formed.store(true, Ordering::SeqCst);
|
||||
println!("Cluster formed with {} nodes (minimum required: {})",
|
||||
self.nodes.len(), self.min_nodes_for_cluster);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Удаление шард-узла
|
||||
pub fn remove_node(&self, node_id: &str) -> Result<()> {
|
||||
self.nodes.remove(node_id);
|
||||
|
||||
// Проверяем сформированность кластера после удаления
|
||||
if self.nodes.len() < self.min_nodes_for_cluster {
|
||||
self.cluster_formed.store(false, Ordering::SeqCst);
|
||||
self.is_leader.store(false, Ordering::SeqCst);
|
||||
println!("Cluster no longer formed. Have {} nodes (need {})",
|
||||
self.nodes.len(), self.min_nodes_for_cluster);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Настройка шардинга для коллекции
|
||||
pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> {
|
||||
// Проверка наличия кластера перед настройкой шардинга
|
||||
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||
return Err(crate::common::FutriixError::ShardingError(
|
||||
format!("Cannot setup sharding: cluster not formed. Need at least {} nodes.",
|
||||
self.min_nodes_for_cluster)
|
||||
));
|
||||
}
|
||||
|
||||
let sharding = CollectionSharding {
|
||||
shard_key: shard_key.to_string(),
|
||||
virtual_nodes: self.virtual_nodes_per_node,
|
||||
ring: BTreeMap::new(),
|
||||
};
|
||||
|
||||
self.collections.insert(collection.to_string(), sharding);
|
||||
self.rebuild_ring(collection)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Перестроение хэш-ринга для коллекции
|
||||
fn rebuild_ring(&self, collection: &str) -> Result<()> {
|
||||
if let Some(mut sharding) = self.collections.get_mut(collection) {
|
||||
sharding.ring.clear();
|
||||
|
||||
for entry in self.nodes.iter() {
|
||||
let node_id = entry.key();
|
||||
for i in 0..sharding.virtual_nodes {
|
||||
let key = format!("{}-{}", node_id, i);
|
||||
let hash = self.hash_key(&key);
|
||||
sharding.ring.insert(hash, node_id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Хэширование ключа
|
||||
fn hash_key(&self, key: &str) -> u64 {
|
||||
let mut hasher = SipHasher13::new();
|
||||
key.hash(&mut hasher);
|
||||
hasher.finish()
|
||||
}
|
||||
|
||||
/// Поиск узла для ключа
|
||||
pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result<Option<String>> {
|
||||
// Проверка наличия кластера перед поиском узла
|
||||
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||
return Err(crate::common::FutriixError::ShardingError(
|
||||
format!("Cannot find node: cluster not formed. Need at least {} nodes.",
|
||||
self.min_nodes_for_cluster)
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(sharding) = self.collections.get(collection) {
|
||||
let key_hash = self.hash_key(key_value);
|
||||
|
||||
// Поиск в хэш-ринге (консистентное хэширование)
|
||||
let mut range = sharding.ring.range(key_hash..);
|
||||
if let Some((_, node_id)) = range.next() {
|
||||
return Ok(Some(node_id.clone()));
|
||||
}
|
||||
|
||||
// Если не найдено в верхней части ринга, берем первый узел
|
||||
if let Some((_, node_id)) = sharding.ring.iter().next() {
|
||||
return Ok(Some(node_id.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Миграция шарда
|
||||
pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> {
|
||||
// Проверка наличия кластера перед миграцией
|
||||
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||
return Err(crate::common::FutriixError::ShardingError(
|
||||
format!("Cannot migrate shard: cluster not formed. Need at least {} nodes.",
|
||||
self.min_nodes_for_cluster)
|
||||
));
|
||||
}
|
||||
|
||||
// Проверяем существование узлов
|
||||
if !self.nodes.contains_key(from_node) {
|
||||
return Err(crate::common::FutriixError::ShardingError(
|
||||
format!("Source node '{}' not found in cluster", from_node)
|
||||
));
|
||||
}
|
||||
|
||||
if !self.nodes.contains_key(to_node) {
|
||||
return Err(crate::common::FutriixError::ShardingError(
|
||||
format!("Destination node '{}' not found in cluster", to_node)
|
||||
));
|
||||
}
|
||||
|
||||
println!("Migrating shard for collection '{}' from {} to {} with key {}",
|
||||
collection, from_node, to_node, shard_key);
|
||||
|
||||
// TODO: Реализовать фактическую миграцию данных
|
||||
|
||||
self.rebuild_ring(collection)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Ребалансировка кластера
|
||||
pub fn rebalance_cluster(&self) -> Result<()> {
|
||||
// Проверка наличия кластера перед ребалансировкой
|
||||
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||
return Err(crate::common::FutriixError::ShardingError(
|
||||
format!("Cannot rebalance cluster: cluster not formed. Need at least {} nodes.",
|
||||
self.min_nodes_for_cluster)
|
||||
));
|
||||
}
|
||||
|
||||
println!("Rebalancing cluster with {} nodes", self.nodes.len());
|
||||
|
||||
// Перестраиваем все хэш-ринги
|
||||
for mut entry in self.collections.iter_mut() {
|
||||
let sharding = entry.value_mut();
|
||||
sharding.ring.clear();
|
||||
|
||||
for node_entry in self.nodes.iter() {
|
||||
let node_id = node_entry.key();
|
||||
for i in 0..sharding.virtual_nodes {
|
||||
let key = format!("{}-{}", node_id, i);
|
||||
let hash = self.hash_key(&key);
|
||||
sharding.ring.insert(hash, node_id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ребалансировка узлов кластера
|
||||
self.rebalance_nodes()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Ребалансировка узлов кластера
|
||||
fn rebalance_nodes(&self) -> Result<()> {
|
||||
println!("Rebalancing nodes in cluster...");
|
||||
|
||||
// Рассчитываем среднюю загрузку
|
||||
let total_capacity: u64 = self.nodes.iter().map(|entry| entry.value().capacity).sum();
|
||||
let total_used: u64 = self.nodes.iter().map(|entry| entry.value().used).sum();
|
||||
let avg_usage = if total_capacity > 0 { total_used as f64 / total_capacity as f64 } else { 0.0 };
|
||||
|
||||
println!("Cluster usage: {:.2}% ({} / {})", avg_usage * 100.0, total_used, total_capacity);
|
||||
|
||||
// Находим перегруженные и недогруженные узлы
|
||||
let mut overloaded_nodes = Vec::new();
|
||||
let mut underloaded_nodes = Vec::new();
|
||||
|
||||
for entry in self.nodes.iter() {
|
||||
let node = entry.value();
|
||||
let usage = if node.capacity > 0 { node.used as f64 / node.capacity as f64 } else { 0.0 };
|
||||
|
||||
if usage > avg_usage * 1.2 { // Более чем на 20% выше среднего
|
||||
overloaded_nodes.push((node.node_id.clone(), usage));
|
||||
} else if usage < avg_usage * 0.8 { // Более чем на 20% ниже среднего
|
||||
underloaded_nodes.push((node.node_id.clone(), usage));
|
||||
}
|
||||
}
|
||||
|
||||
println!("Overloaded nodes: {}", overloaded_nodes.len());
|
||||
println!("Underloaded nodes: {}", underloaded_nodes.len());
|
||||
|
||||
// TODO: Реализовать алгоритм миграции данных между узлами
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Получение статуса кластера
|
||||
pub fn get_cluster_status(&self) -> Result<protocol::ClusterStatus> {
|
||||
let mut cluster_nodes = Vec::new();
|
||||
let mut total_capacity = 0;
|
||||
let mut total_used = 0;
|
||||
let mut raft_nodes = Vec::new();
|
||||
|
||||
for entry in self.nodes.iter() {
|
||||
let node = entry.value();
|
||||
total_capacity += node.capacity;
|
||||
total_used += node.used;
|
||||
|
||||
cluster_nodes.push(protocol::ShardInfo {
|
||||
node_id: node.node_id.clone(),
|
||||
address: node.address.clone(),
|
||||
capacity: node.capacity,
|
||||
used: node.used,
|
||||
collections: node.collections.clone(),
|
||||
});
|
||||
|
||||
raft_nodes.push(protocol::RaftNodeInfo {
|
||||
node_id: node.node_id.clone(),
|
||||
address: node.address.clone(),
|
||||
state: match node.raft_info.state {
|
||||
RaftState::Leader => "leader".to_string(),
|
||||
RaftState::Follower => "follower".to_string(),
|
||||
RaftState::Candidate => "candidate".to_string(),
|
||||
},
|
||||
term: node.raft_info.term,
|
||||
last_heartbeat: node.raft_info.last_heartbeat,
|
||||
});
|
||||
}
|
||||
|
||||
// Проверяем, нужна ли ребалансировка
|
||||
let rebalance_needed = {
|
||||
if total_capacity == 0 {
|
||||
false
|
||||
} else {
|
||||
let avg_usage = total_used as f64 / total_capacity as f64;
|
||||
let mut needs_rebalance = false;
|
||||
|
||||
for node in self.nodes.iter() {
|
||||
let usage = if node.value().capacity > 0 {
|
||||
node.value().used as f64 / node.value().capacity as f64
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
if usage > avg_usage * 1.2 || usage < avg_usage * 0.8 {
|
||||
needs_rebalance = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
needs_rebalance
|
||||
}
|
||||
};
|
||||
|
||||
Ok(protocol::ClusterStatus {
|
||||
nodes: cluster_nodes,
|
||||
total_capacity,
|
||||
total_used,
|
||||
rebalance_needed,
|
||||
cluster_formed: self.cluster_formed.load(Ordering::SeqCst),
|
||||
leader_exists: self.is_leader.load(Ordering::SeqCst),
|
||||
raft_nodes,
|
||||
})
|
||||
}
|
||||
|
||||
/// Получение списка Raft узлов
|
||||
pub fn get_raft_nodes(&self) -> Vec<RaftNode> {
|
||||
self.nodes.iter()
|
||||
.map(|entry| entry.value().raft_info.clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Проверка сформированности кластера
|
||||
pub fn is_cluster_formed(&self) -> bool {
|
||||
self.cluster_formed.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
/// Raft выборы - начало кампании
|
||||
pub fn start_election(&self) -> Result<()> {
|
||||
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||
return Err(crate::common::FutriixError::ShardingError(
|
||||
format!("Cluster not formed. Need at least {} nodes.", self.min_nodes_for_cluster)
|
||||
));
|
||||
}
|
||||
|
||||
let new_term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
println!("Starting election for term {}", new_term);
|
||||
|
||||
// Переход в состояние candidate
|
||||
self.is_leader.store(false, Ordering::SeqCst);
|
||||
|
||||
// Обновляем состояние текущего узла
|
||||
if let Some(mut node) = self.nodes.get_mut(&self.node_id) {
|
||||
node.raft_info.state = RaftState::Candidate;
|
||||
node.raft_info.term = new_term;
|
||||
node.raft_info.voted_for = Some(self.node_id.clone());
|
||||
}
|
||||
|
||||
// Отправляем запросы на голосование
|
||||
self.replication_queue.push(ReplicationEvent::RaftVoteRequest {
|
||||
term: new_term,
|
||||
candidate_id: self.node_id.clone(),
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Отправка команды на репликацию
|
||||
pub async fn replicate(&self, command: protocol::Command) -> Result<()> {
|
||||
if !self.replication_enabled.load(Ordering::SeqCst) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if !self.cluster_formed.load(Ordering::SeqCst) {
|
||||
return Err(crate::common::FutriixError::ShardingError(
|
||||
"Cannot replicate: cluster not formed".to_string()
|
||||
));
|
||||
}
|
||||
|
||||
self.replication_queue.push(ReplicationEvent::Command(command));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Запрос синхронизации с другими узлами
|
||||
pub async fn request_sync(&self) -> Result<()> {
|
||||
if !self.replication_enabled.load(Ordering::SeqCst) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.replication_queue.push(ReplicationEvent::SyncRequest);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Получение списка узлов репликации
|
||||
pub fn get_nodes(&self) -> Vec<ShardNode> {
|
||||
self.nodes.iter()
|
||||
.map(|entry| entry.value().clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Получение текущего номера последовательности
|
||||
pub fn get_sequence_number(&self) -> u64 {
|
||||
self.sequence_number.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
/// Проверка, включена ли репликация
|
||||
pub fn is_replication_enabled(&self) -> bool {
|
||||
self.replication_enabled.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
/// Получение информации об узле
|
||||
pub fn get_node(&self, node_id: &str) -> Option<ShardNode> {
|
||||
self.nodes.get(node_id).map(|entry| entry.value().clone())
|
||||
}
|
||||
|
||||
/// Получение ID текущего узла
|
||||
pub fn get_node_id(&self) -> &str {
|
||||
&self.node_id
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user