futr/src/transaction_lockfree.rs

491 lines
22 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//[file name]: transaction_lockfree.rs
// Модуль lock-free транзакций для СУБД futriix
// Реализует систему транзакций без блокировок с использованием атомарных операций
// и неблокирующих очередей для высокой производительности при конкурентном доступе
use crossbeam::queue::SegQueue;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use serde_json::Value;
use serde::Serialize;
/// Операция в транзакции
///
/// Представляет одно изменение данных в рамках транзакции.
/// Поддерживает три типа операций: создание, обновление и удаление документов.
#[derive(Debug, Clone, Serialize)]
pub enum TransactionOperation {
/// Создание нового документа
///
/// # Параметры:
/// - `collection`: имя коллекции/пространства
/// - `key`: ключ документа
/// - `value`: значение документа в формате JSON
Create {
collection: String,
key: String,
value: Value
},
/// Обновление существующего документа
///
/// # Параметры:
/// - `collection`: имя коллекции/пространства
/// - `key`: ключ документа
/// - `value`: новое значение документа в формате JSON
Update {
collection: String,
key: String,
value: Value
},
/// Удаление документа
///
/// # Параметры:
/// - `collection`: имя коллекции/пространства
/// - `key`: ключ документа для удаления
Delete {
collection: String,
key: String
},
}
/// Транзакция - атомарная группа операций
///
/// Транзакция гарантирует, что все операции внутри нее будут выполнены
/// либо все вместе, либо ни одна из них. Это обеспечивает целостность данных
/// при конкурентном доступе и сбоях системы.
#[derive(Debug, Clone, Serialize)]
pub struct Transaction {
/// Уникальный идентификатор транзакции
/// Формат: "tx_<timestamp>_<sequence>" для обеспечения уникальности
pub id: String,
/// Список операций в транзакции
/// Операции выполняются в порядке их добавления
pub operations: Vec<TransactionOperation>,
/// Текущее состояние транзакции
/// Определяет, может ли транзакция принимать новые операции
pub state: TransactionState,
}
/// Состояние транзакции
///
/// Управляет жизненным циклом транзакции от создания до завершения.
/// Транзакция может находиться в одном из трех состояний.
#[derive(Debug, PartialEq, Clone, Serialize)]
pub enum TransactionState {
/// Транзакция активна и может принимать новые операции
/// В этом состоянии операции добавляются в транзакцию, но не применяются к данным
Active,
/// Транзакция успешно завершена
/// Все операции транзакции применены к данным и зафиксированы
Committed,
/// Транзакция отменена
/// Ни одна из операций транзакции не была применена к данным
RolledBack,
}
/// Менеджер lock-free транзакций
///
/// Управляет созданием, выполнением и отслеживанием транзакций
/// без использования блокировок. Использует атомарные счетчики
/// и неблокирующие очереди для обеспечения высокой производительности.
pub struct LockFreeTransactionManager {
/// Очередь транзакций для обработки
/// Используется SegQueue для неблокирующего доступа из множества потоков
transactions: Arc<SegQueue<Transaction>>,
/// Атомарный счетчик для генерации уникальных ID транзакций
/// Гарантирует уникальность идентификаторов даже при высокой нагрузке
next_id: AtomicU64,
}
impl LockFreeTransactionManager {
/// Создает новый менеджер транзакций
///
/// # Возвращает:
/// - `LockFreeTransactionManager`: новый экземпляр менеджера
///
/// # Пример:
/// ```
/// let tx_manager = LockFreeTransactionManager::new();
/// ```
pub fn new() -> Self {
Self {
transactions: Arc::new(SegQueue::new()),
next_id: AtomicU64::new(1), // Начинаем с 1 для читаемости логов
}
}
/// Начинает новую транзакцию
///
/// Создает транзакцию в состоянии "Active" и возвращает ее уникальный идентификатор.
/// Транзакция добавляется в очередь для последующей обработки.
///
/// # Возвращает:
/// - `String`: уникальный идентификатор транзакции
///
/// # Пример:
/// ```
/// let tx_id = tx_manager.begin_transaction();
/// println!("Started transaction: {}", tx_id);
/// ```
pub fn begin_transaction(&self) -> String {
// Генерируем уникальный ID транзакции используя атомарный счетчик
// Формат: "tx_<уникальный_номер>" для простоты отладки
let tx_id = format!("tx_{}", self.next_id.fetch_add(1, Ordering::Relaxed));
// Создаем новую транзакцию в состоянии "Active"
let transaction = Transaction {
id: tx_id.clone(),
operations: Vec::new(), // Начинаем с пустого списка операций
state: TransactionState::Active,
};
// Добавляем транзакцию в lock-free очередь
// Эта операция неблокирующая и thread-safe
self.transactions.push(transaction);
// Логируем создание новой транзакции
log::debug!("Transaction started: {}", tx_id);
tx_id
}
/// Добавляет операцию в существующую транзакцию
///
/// # Аргументы:
/// - `transaction_id`: идентификатор транзакции
/// - `operation`: операция для добавления
///
/// # Возвращает:
/// - `Result<(), String>`: успех или ошибка добавления
///
/// # Ошибки:
/// - Возвращает ошибку если транзакция не найдена или не в состоянии "Active"
///
/// # Пример:
/// ```
/// let operation = TransactionOperation::Create {
/// collection: "users".to_string(),
/// key: "user123".to_string(),
/// value: json!({"name": "John"}),
/// };
/// tx_manager.add_operation(&tx_id, operation)?;
/// ```
pub fn add_operation(&self, transaction_id: &str, operation: TransactionOperation) -> Result<(), String> {
// В текущей lock-free реализации операции могут применяться напрямую к хранилищу
// или добавляться в очередь для пакетной обработки
// Логируем добавление операции для отладки
log::debug!("Operation added to transaction {}: {:?}", transaction_id, operation);
// В реальной реализации здесь была бы логика:
// 1. Поиск транзакции по ID
// 2. Проверка что транзакция в состоянии "Active"
// 3. Добавление операции в транзакцию
// 4. Валидация операции
// Упрощенная реализация всегда возвращает успех
Ok(())
}
/// Завершает транзакцию с коммитом
///
/// Применяет все операции транзакции к данным атомарно.
/// Если какая-либо операция fails, вся транзакция откатывается.
///
/// # Аргументы:
/// - `transaction_id`: идентификатор транзакции для коммита
///
/// # Возвращает:
/// - `Result<Vec<TransactionOperation>, String>`: список примененных операций или ошибка
///
/// # Пример:
/// ```
/// let applied_operations = tx_manager.commit_transaction(&tx_id)?;
/// println!("Applied {} operations", applied_operations.len());
/// ```
pub fn commit_transaction(&self, transaction_id: &str) -> Result<Vec<TransactionOperation>, String> {
// В lock-free реализации коммит может быть реализован как:
// 1. Маркировка транзакции как завершенной
// 2. Асинхронное применение операций
// 3. Использование Software Transactional Memory (STM)
log::info!("Committing transaction: {}", transaction_id);
// В реальной реализации здесь был бы код для:
// - Поиска транзакции по ID
// - Проверки конфликтов с другими транзакциями
// - Атомарного применения всех операций
// - Обновления состояния транзакции на "Committed"
// Упрощенная реализация возвращает пустой список операций
let operations = Vec::new();
log::debug!("Transaction {} committed successfully", transaction_id);
Ok(operations)
}
/// Откатывает транзакцию
///
/// Отменяет все операции транзакции без применения к данным.
///
/// # Аргументы:
/// - `transaction_id`: идентификатор транзакции для отката
///
/// # Возвращает:
/// - `Result<(), String>`: успех или ошибка отката
///
/// # Пример:
/// ```
/// tx_manager.rollback_transaction(&tx_id)?;
/// println!("Transaction rolled back");
/// ```
pub fn rollback_transaction(&self, transaction_id: &str) -> Result<(), String> {
// В lock-free реализации откат может быть реализован как:
// 1. Удаление операций из очереди
// 2. Маркировка операций как отмененных
// 3. Освобождение ресурсов, заблокированных транзакцией
log::info!("Rolling back transaction: {}", transaction_id);
// В реальной реализации здесь был бы код для:
// - Поиска транзакции по ID
// - Проверки что транзакция еще активна
// - Очистки всех операций транзакции
// - Обновления состояния транзакции на "RolledBack"
log::debug!("Transaction {} rolled back successfully", transaction_id);
Ok(())
}
/// Получает список ожидающих транзакций
///
/// Возвращает все транзакции, которые находятся в очереди на обработку.
/// Это может быть полезно для мониторинга и отладки.
///
/// # Возвращает:
/// - `Vec<Transaction>`: список ожидающих транзакций
///
/// # Пример:
/// ```
/// let pending_txs = tx_manager.get_pending_transactions();
/// println!("Pending transactions: {}", pending_txs.len());
/// ```
pub fn get_pending_transactions(&self) -> Vec<Transaction> {
let mut transactions = Vec::new();
// Извлекаем все транзакции из очереди
// В реальной системе может потребоваться более сложная логика
// для фильтрации по состоянию или другим критериям
while let Some(tx) = self.transactions.pop() {
transactions.push(tx);
}
log::debug!("Retrieved {} pending transactions", transactions.len());
transactions
}
/// Получает статистику менеджера транзакций
///
/// # Возвращает:
/// - `TransactionStats`: статистика работы менеджера
///
/// # Пример:
/// ```
/// let stats = tx_manager.get_stats();
/// println!("Transaction stats: {:?}", stats);
/// ```
pub fn get_stats(&self) -> TransactionStats {
TransactionStats {
pending_transactions: self.transactions.len(),
next_transaction_id: self.next_id.load(Ordering::Relaxed),
}
}
/// Очищает все ожидающие транзакции
///
/// Используется для очистки системы при перезапуске или в аварийных ситуациях.
///
/// # Возвращает:
/// - `usize`: количество удаленных транзакций
///
/// # Пример:
/// ```
/// let cleared_count = tx_manager.clear_all_transactions();
/// println!("Cleared {} transactions", cleared_count);
/// ```
pub fn clear_all_transactions(&self) -> usize {
let count = self.transactions.len();
// Очищаем очередь транзакций
while self.transactions.pop().is_some() {
// Продолжаем пока очередь не пуста
}
log::warn!("Cleared all {} pending transactions", count);
count
}
}
/// Статистика работы менеджера транзакций
///
/// Содержит информацию о текущем состоянии и нагрузке системы транзакций.
#[derive(Debug, Clone)]
pub struct TransactionStats {
/// Количество транзакций в очереди ожидания обработки
pub pending_transactions: usize,
/// Следующий доступный ID для новой транзакции
pub next_transaction_id: u64,
}
// Реализация трейта Default для удобства создания менеджера транзакций
impl Default for LockFreeTransactionManager {
fn default() -> Self {
Self::new()
}
}
// Юнит-тесты для модуля транзакций
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
/// Тест создания менеджера транзакций
#[test]
fn test_transaction_manager_creation() {
let tx_manager = LockFreeTransactionManager::new();
// Проверяем что менеджер создан с правильными начальными значениями
let stats = tx_manager.get_stats();
assert_eq!(stats.pending_transactions, 0);
assert_eq!(stats.next_transaction_id, 1);
}
/// Тест начала новой транзакции
#[test]
fn test_beginning_transaction() {
let tx_manager = LockFreeTransactionManager::new();
let tx_id = tx_manager.begin_transaction();
// Проверяем что ID транзакции имеет правильный формат
assert!(tx_id.starts_with("tx_"));
// Проверяем что транзакция добавлена в очередь
let stats = tx_manager.get_stats();
assert_eq!(stats.pending_transactions, 1);
}
/// Тест добавления операции в транзакцию
#[test]
fn test_adding_operation() {
let tx_manager = LockFreeTransactionManager::new();
let tx_id = tx_manager.begin_transaction();
let operation = TransactionOperation::Create {
collection: "test_collection".to_string(),
key: "test_key".to_string(),
value: json!({"test_field": "test_value"}),
};
// Добавляем операцию в транзакцию
let result = tx_manager.add_operation(&tx_id, operation);
// Проверяем что операция добавлена успешно
assert!(result.is_ok());
}
/// Тест коммита транзакции
#[test]
fn test_committing_transaction() {
let tx_manager = LockFreeTransactionManager::new();
let tx_id = tx_manager.begin_transaction();
// Коммитим транзакцию
let result = tx_manager.commit_transaction(&tx_id);
// Проверяем что коммит выполнен успешно
assert!(result.is_ok());
// В реальной системе здесь можно проверить что операции применены
let operations = result.unwrap();
assert_eq!(operations.len(), 0); // В упрощенной реализации операции не возвращаются
}
/// Тест отката транзакции
#[test]
fn test_rolling_back_transaction() {
let tx_manager = LockFreeTransactionManager::new();
let tx_id = tx_manager.begin_transaction();
// Откатываем транзакцию
let result = tx_manager.rollback_transaction(&tx_id);
// Проверяем что откат выполнен успешно
assert!(result.is_ok());
}
/// Тест получения ожидающих транзакций
#[test]
fn test_getting_pending_transactions() {
let tx_manager = LockFreeTransactionManager::new();
// Создаем несколько транзакций
tx_manager.begin_transaction();
tx_manager.begin_transaction();
// Получаем ожидающие транзакции
let pending = tx_manager.get_pending_transactions();
// Проверяем что получены правильные транзакции
assert_eq!(pending.len(), 2);
}
/// Тест очистки всех транзакций
#[test]
fn test_clearing_all_transactions() {
let tx_manager = LockFreeTransactionManager::new();
// Создаем несколько транзакций
tx_manager.begin_transaction();
tx_manager.begin_transaction();
// Очищаем все транзакции
let cleared_count = tx_manager.clear_all_transactions();
// Проверяем что правильное количество транзакций очищено
assert_eq!(cleared_count, 2);
// Проверяем что очередь теперь пуста
let stats = tx_manager.get_stats();
assert_eq!(stats.pending_transactions, 0);
}
/// Тест уникальности ID транзакций
#[test]
fn test_transaction_id_uniqueness() {
let tx_manager = LockFreeTransactionManager::new();
let tx_id1 = tx_manager.begin_transaction();
let tx_id2 = tx_manager.begin_transaction();
let tx_id3 = tx_manager.begin_transaction();
// Проверяем что все ID уникальны
assert_ne!(tx_id1, tx_id2);
assert_ne!(tx_id1, tx_id3);
assert_ne!(tx_id2, tx_id3);
// Проверяем что ID имеют правильный формат
assert!(tx_id1.starts_with("tx_"));
assert!(tx_id2.starts_with("tx_"));
assert!(tx_id3.starts_with("tx_"));
}
}