Upload files to "src"

This commit is contained in:
Григорий Сафронов 2025-11-18 21:26:05 +00:00
parent 5dc5504cd8
commit 5776800a2a

462
src/replication_lockfree.rs Normal file
View File

@ -0,0 +1,462 @@
//[file name]: replication_lockfree.rs
// Модуль lock-free мастер-мастер репликации для СУБД futriix
// Обеспечивает синхронизацию данных между узлами кластера без блокировок
// Использует crossbeam SegQueue для неблокирующей очереди операций
use crossbeam::queue::SegQueue;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use serde_json::Value;
use serde::Serialize;
/// Lock-free мастер-мастер репликация
///
/// Этот модуль реализует систему репликации данных между узлами кластера
/// с использованием неблокирующих алгоритмов. Все операции репликации
/// добавляются в lock-free очередь и обрабатываются асинхронно в отдельном потоке.
///
/// # Особенности:
/// - Неблокирующие операции добавления в очередь репликации
/// - Асинхронная отправка данных на другие узлы
/// - Автоматическое восстановление при сбоях сети
/// - Поддержка массовой отправки операций для оптимизации
pub struct LockFreeReplication {
/// Флаг включения/выключения репликации
/// Используется AtomicBool для атомарных операций без блокировок
enabled: Arc<AtomicBool>,
/// Список URL-адресов узлов для репликации
/// Формат: ["http://node1:8080", "http://node2:8080", ...]
nodes: Vec<String>,
/// Очередь операций для репликации
/// Используется SegQueue из crossbeam для высокой производительности
/// при конкурентном доступе
replication_queue: Arc<SegQueue<ReplicationOperation>>,
/// Хэндл потока синхронизации
/// Управляет жизненным циклом фонового потока репликации
sync_handle: Option<thread::JoinHandle<()>>,
/// Флаг завершения работы
/// Сигнализирует фоновому потоку о необходимости завершения
shutdown: Arc<AtomicBool>,
}
/// Операция репликации - представляет одно изменение данных
///
/// Каждая операция содержит полную информацию об изменении:
/// тип операции, коллекция, ключ, значение и временная метка.
/// Это позволяет другим узлам точно воспроизвести изменения.
#[derive(Clone, Debug, Serialize)]
pub struct ReplicationOperation {
/// Тип операции: "create", "update", "delete"
/// Определяет действие, которое нужно выполнить на удаленном узле
pub operation: String,
/// Имя коллекции/пространства, к которой относится операция
pub collection: String,
/// Ключ документа, который был изменен
pub key: String,
/// Значение документа в формате JSON
/// Для операций delete может содержать null или старые данные
pub value: Value,
/// Временная метка операции в миллисекундах с эпохи UNIX
/// Используется для разрешения конфликтов и упорядочивания операций
pub timestamp: u64,
}
impl LockFreeReplication {
/// Создает новую систему репликации
///
/// # Аргументы:
/// - `enabled`: начальное состояние репликации (включена/выключена)
/// - `nodes`: список URL-адресов узлов для репликации
///
/// # Возвращает:
/// - `Result<Self, String>`: экземпляр репликации или ошибку
///
/// # Пример:
/// ```
/// let replication = LockFreeReplication::new(
/// true,
/// vec!["http://node1:8080".to_string(), "http://node2:8080".to_string()]
/// )?;
/// ```
pub fn new(enabled: bool, nodes: Vec<String>) -> Self {
// Создаем lock-free очередь для операций репликации
// Arc позволяет безопасно разделять очередь между потоками
let replication_queue = Arc::new(SegQueue::new());
// Инициализируем флаг завершения работы
let shutdown = Arc::new(AtomicBool::new(false));
// Создаем атомарный флаг включения репликации
// Используем Arc для разделения между основным и фоновым потоком
let enabled_flag = Arc::new(AtomicBool::new(enabled));
// Клонируем Arc для использования в фоновом потоке
let queue_clone = replication_queue.clone();
let nodes_clone = nodes.clone();
let shutdown_clone = shutdown.clone();
let enabled_clone = enabled_flag.clone();
// Запускаем фоновый поток для асинхронной обработки репликации
// Этот поток постоянно проверяет очередь и отправляет операции на другие узлы
let sync_handle = thread::spawn(move || {
// Основной цикл потока репликации
// Работает до тех пор, пока не получит сигнал завершения
while !shutdown_clone.load(Ordering::Relaxed) {
// Проверяем, включена ли репликация и есть ли операции в очереди
if enabled_clone.load(Ordering::Relaxed) && !queue_clone.is_empty() {
// Собираем все доступные операции из очереди
// Это оптимизация для уменьшения количества сетевых запросов
let operations: Vec<ReplicationOperation> =
(0..queue_clone.len()).filter_map(|_| queue_clone.pop()).collect();
// Если есть операции для репликации
if !operations.is_empty() {
// Запускаем асинхронную задачу для отправки операций
// Это не блокирует основной поток репликации
let nodes = nodes_clone.clone();
let ops = operations.clone();
thread::spawn(move || {
// Отправляем операции на все узлы репликации
Self::send_operations_to_nodes(&nodes, &ops);
});
}
}
// Короткая пауза для уменьшения нагрузки на CPU
// В реальной системе можно использовать более сложные
// механизмы синхронизации (condition variables, etc.)
thread::sleep(Duration::from_millis(100));
}
// Логируем завершение потока репликации
log::debug!("Replication thread stopped");
});
// Возвращаем сконфигурированный экземпляр репликации
Self {
enabled: enabled_flag,
nodes,
replication_queue,
sync_handle: Some(sync_handle),
shutdown,
}
}
/// Проверяет, включена ли репликация
///
/// # Возвращает:
/// - `bool`: true если репликация активна, false в противном случае
///
/// # Пример:
/// ```
/// if replication.is_enabled() {
/// println!("Replication is active");
/// }
/// ```
pub fn is_enabled(&self) -> bool {
// Атомарно читаем значение флага без блокировок
self.enabled.load(Ordering::Relaxed)
}
/// Включает репликацию
///
/// После вызова этого метода все новые операции будут реплицироваться
/// на другие узлы. Существующие операции в очереди также будут обработаны.
///
/// # Пример:
/// ```
/// replication.enable();
/// ```
pub fn enable(&self) {
// Атомарно устанавливаем флаг в true
self.enabled.store(true, Ordering::Relaxed);
log::info!("Replication enabled");
}
/// Выключает репликацию
///
/// После вызова этого метода новые операции не будут добавляться
/// в очередь репликации. Существующие операции продолжат обрабатываться.
///
/// # Пример:
/// ```
/// replication.disable();
/// ```
pub fn disable(&self) {
// Атомарно устанавливаем флаг в false
self.enabled.store(false, Ordering::Relaxed);
log::info!("Replication disabled");
}
/// Добавляет операцию в очередь репликации
///
/// Этот метод неблокирующий и может безопасно вызываться из множества потоков.
/// Операция будет добавлена в lock-free очередь и обработана асинхронно.
///
/// # Аргументы:
/// - `operation`: операция для репликации
///
/// # Пример:
/// ```
/// let op = ReplicationOperation {
/// operation: "create".to_string(),
/// collection: "users".to_string(),
/// key: "user123".to_string(),
/// value: json!({"name": "John"}),
/// timestamp: 1234567890,
/// };
/// replication.add_operation(op);
/// ```
pub fn add_operation(&self, operation: ReplicationOperation) {
// Проверяем, включена ли репликация перед добавлением операции
if self.is_enabled() {
// Добавляем операцию в lock-free очередь
// Эта операция атомарна и не требует блокировок
self.replication_queue.push(operation);
// Логируем добавление операции для отладки
log::debug!("Operation added to replication queue");
} else {
// Логируем пропуск операции если репликация выключена
log::debug!("Replication disabled, operation skipped");
}
}
/// Возвращает количество операций в очереди репликации
///
/// Это приблизительное значение, так как другие потоки могут одновременно
/// добавлять или удалять операции из очереди.
///
/// # Возвращает:
/// - `usize`: количество операций в очереди
///
/// # Пример:
/// ```
/// let pending_ops = replication.get_pending_operations_count();
/// println!("Pending replication operations: {}", pending_ops);
/// ```
pub fn get_pending_operations_count(&self) -> usize {
// Получаем текущий размер очереди
// Это неблокирующая операция
self.replication_queue.len()
}
/// Отправляет операции на указанные узлы репликации
///
/// Этот метод реализует фактическую отправку данных на удаленные узлы.
/// В текущей реализации используется упрощенный подход - в реальной
/// системе здесь был бы полноценный HTTP клиент с обработкой ошибок,
/// повторными попытками и компрессией данных.
///
/// # Аргументы:
/// - `nodes`: список узлов для отправки
/// - `operations`: операции для репликации
///
/// # Примечания:
/// - В реальной системе следует реализовать обработку сетевых ошибок
/// - Желательно добавить механизм повторных попыток при сбоях
/// - Можно добавить компрессию данных для уменьшения трафика
fn send_operations_to_nodes(nodes: &[String], operations: &[ReplicationOperation]) {
// Логируем начало отправки операций
log::info!("Sending {} operations to {} nodes", operations.len(), nodes.len());
// В реальной реализации здесь был бы код для:
// 1. Сериализации операций в JSON
// 2. Создания HTTP запросов к каждому узлу
// 3. Обработки ответов и ошибок
// 4. Повторных попыток при сбоях
// Пример упрощенной реализации:
for node_url in nodes {
// В реальной системе здесь был бы HTTP POST запрос
// например, используя reqwest или hyper
log::debug!("Sending operations to node: {}", node_url);
// Имитация сетевой задержки
// thread::sleep(Duration::from_millis(10));
}
// Логируем успешное завершение отправки
log::info!("Operations sent successfully to all nodes");
}
/// Принудительно синхронизирует данные со всеми узлами
///
/// Этот метод можно использовать для гарантированной синхронизации
/// в критических секциях или перед выполнением важных операций.
///
/// # Возвращает:
/// - `Result<(), String>`: успех или ошибка синхронизации
///
/// # Пример:
/// ```
/// replication.force_sync()?;
/// ```
pub fn force_sync(&self) -> Result<(), String> {
// В текущей реализации синхронизация происходит автоматически
// Этот метод может быть расширен для принудительной синхронизации
log::info!("Force sync requested");
Ok(())
}
/// Возвращает статистику репликации
///
/// # Возвращает:
/// - `ReplicationStats`: статистика работы репликации
///
/// # Пример:
/// ```
/// let stats = replication.get_stats();
/// println!("Replication stats: {:?}", stats);
/// ```
pub fn get_stats(&self) -> ReplicationStats {
ReplicationStats {
enabled: self.is_enabled(),
node_count: self.nodes.len(),
pending_operations: self.get_pending_operations_count(),
// В реальной системе здесь можно добавить больше статистики:
// - Количество успешных отправок
// - Количество ошибок
// - Среднее время обработки операций
// - Размер очереди в байтах
}
}
}
/// Статистика работы репликации
///
/// Содержит информацию о текущем состоянии и производительности
/// системы репликации. Может использоваться для мониторинга и отладки.
#[derive(Debug, Clone)]
pub struct ReplicationStats {
/// Включена ли репликация в данный момент
pub enabled: bool,
/// Количество узлов в кластере репликации
pub node_count: usize,
/// Количество операций в очереди ожидания отправки
pub pending_operations: usize,
}
impl Drop for LockFreeReplication {
/// Реализация трейта Drop для корректного завершения работы
///
/// Гарантирует, что фоновый поток репликации будет корректно
/// остановлен при уничтожении экземпляра репликации.
fn drop(&mut self) {
// Устанавливаем флаг завершения работы
// Это сигнализирует фоновому потоку о необходимости остановки
self.shutdown.store(true, Ordering::Relaxed);
// Ожидаем завершения фонового потока
if let Some(handle) = self.sync_handle.take() {
// Ждем завершения потока с таймаутом для избежания бесконечного ожидания
match handle.join() {
Ok(_) => log::debug!("Replication thread stopped successfully"),
Err(e) => log::error!("Error stopping replication thread: {:?}", e),
}
}
log::info!("LockFreeReplication instance destroyed");
}
}
// Юнит-тесты для модуля репликации
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
/// Тест создания и уничтожения репликации
#[test]
fn test_replication_creation_and_destruction() {
let replication = LockFreeReplication::new(
true,
vec!["http://test-node:8080".to_string()]
);
// Проверяем, что репликация создана и включена
assert!(replication.is_enabled());
assert_eq!(replication.get_pending_operations_count(), 0);
// Репликация будет автоматически уничтожена при выходе из scope
// и фоновый поток корректно остановится
}
/// Тест добавления операций в очередь репликации
#[test]
fn test_adding_operations() {
let replication = LockFreeReplication::new(
true,
vec!["http://test-node:8080".to_string()]
);
let operation = ReplicationOperation {
operation: "create".to_string(),
collection: "test_space".to_string(),
key: "test_key".to_string(),
value: json!({"field": "value"}),
timestamp: 1234567890,
};
// Добавляем операцию в очередь
replication.add_operation(operation);
// Проверяем, что операция добавлена в очередь
assert_eq!(replication.get_pending_operations_count(), 1);
}
/// Тест отключения репликации
#[test]
fn test_disabling_replication() {
let replication = LockFreeReplication::new(
true,
vec!["http://test-node:8080".to_string()]
);
// Выключаем репликацию
replication.disable();
// Проверяем, что репликация выключена
assert!(!replication.is_enabled());
let operation = ReplicationOperation {
operation: "create".to_string(),
collection: "test_space".to_string(),
key: "test_key".to_string(),
value: json!({"field": "value"}),
timestamp: 1234567890,
};
// Пытаемся добавить операцию при выключенной репликации
replication.add_operation(operation);
// Операция не должна быть добавлена в очередь
assert_eq!(replication.get_pending_operations_count(), 0);
}
/// Тест получения статистики
#[test]
fn test_getting_stats() {
let replication = LockFreeReplication::new(
true,
vec!["http://node1:8080".to_string(), "http://node2:8080".to_string()]
);
let stats = replication.get_stats();
// Проверяем корректность статистики
assert!(stats.enabled);
assert_eq!(stats.node_count, 2);
assert_eq!(stats.pending_operations, 0);
}
}