Delete src/mvcc.rs
This commit is contained in:
parent
2961a799f1
commit
f0652e8483
209
src/mvcc.rs
209
src/mvcc.rs
@ -1,209 +0,0 @@
|
|||||||
//! Модуль управления многопоточной согласованностью версий (MVCC)
|
|
||||||
//!
|
|
||||||
//! Реализует механизм MVCC для wait-free чтения и изоляции транзакций
|
|
||||||
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
|
||||||
use dashmap::DashMap;
|
|
||||||
|
|
||||||
/// Версия данных для MVCC
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct Version {
|
|
||||||
pub value: crate::parser::sql::Value,
|
|
||||||
pub created_tx: u64,
|
|
||||||
pub expired_tx: Option<u64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Хранилище MVCC
|
|
||||||
pub struct MvccStorage {
|
|
||||||
data: DashMap<String, Vec<Version>>,
|
|
||||||
next_tx_id: AtomicU64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MvccStorage {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
data: DashMap::new(),
|
|
||||||
next_tx_id: AtomicU64::new(1),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Создание новой транзакции
|
|
||||||
pub fn begin_transaction(&self) -> u64 {
|
|
||||||
self.next_tx_id.fetch_add(1, Ordering::SeqCst)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Запись значения с созданием новой версии
|
|
||||||
pub fn write(&self, column: &str, value: crate::parser::sql::Value, tx_id: u64) {
|
|
||||||
let version = Version {
|
|
||||||
value,
|
|
||||||
created_tx: tx_id,
|
|
||||||
expired_tx: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Помечаем предыдущую версию как устаревшую
|
|
||||||
if let Some(mut versions) = self.data.get_mut(column) {
|
|
||||||
if let Some(last_version) = versions.last_mut() {
|
|
||||||
last_version.expired_tx = Some(tx_id);
|
|
||||||
}
|
|
||||||
versions.push(version);
|
|
||||||
} else {
|
|
||||||
self.data.insert(column.to_string(), vec![version]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Получение видимого значения для транзакции
|
|
||||||
pub fn get_visible_value(&self, column: &str, tx_id: u64) -> Option<crate::parser::sql::Value> {
|
|
||||||
if let Some(versions) = self.data.get(column) {
|
|
||||||
for version in versions.iter().rev() {
|
|
||||||
if version.created_tx <= tx_id &&
|
|
||||||
(version.expired_tx.is_none() || version.expired_tx.unwrap() > tx_id) {
|
|
||||||
return Some(version.value.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Фиксация транзакции
|
|
||||||
pub fn commit_transaction(&self, tx_id: u64) -> Result<(), MvccError> {
|
|
||||||
// В простой реализации просто записываем транзакцию как завершенную
|
|
||||||
// В реальной системе здесь была бы более сложная логика
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Откат транзакции
|
|
||||||
pub fn rollback_transaction(&self, tx_id: u64) -> Result<(), MvccError> {
|
|
||||||
// Удаляем все версии, созданные этой транзакцией
|
|
||||||
for mut entry in self.data.iter_mut() {
|
|
||||||
let versions = entry.value_mut();
|
|
||||||
// Удаляем версии, созданные этой транзакцией
|
|
||||||
versions.retain(|v| v.created_tx != tx_id);
|
|
||||||
// Обновляем expired_tx для версий, которые ссылались на эту транзакцию
|
|
||||||
for version in versions.iter_mut() {
|
|
||||||
if version.expired_tx == Some(tx_id) {
|
|
||||||
version.expired_tx = None;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Проверка соответствия строки условию WHERE с учетом MVCC
|
|
||||||
pub fn matches_where(&self, clause: &crate::parser::sql::WhereClause, tx_id: u64) -> bool {
|
|
||||||
// Извлекаем имя столбца из условия
|
|
||||||
let column_name = match &clause.left {
|
|
||||||
crate::parser::sql::Expression::Column(name) => name,
|
|
||||||
_ => return false,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Извлекаем значение для сравнения
|
|
||||||
let clause_value = match &clause.right {
|
|
||||||
Some(crate::parser::sql::Expression::Value(value)) => value,
|
|
||||||
None => {
|
|
||||||
// Обработка IS NULL и IS NOT NULL
|
|
||||||
let value = self.get_visible_value(column_name, tx_id);
|
|
||||||
return match &clause.operator {
|
|
||||||
crate::parser::sql::Operator::IsNull => matches!(value, None),
|
|
||||||
crate::parser::sql::Operator::IsNotNull => value.is_some(),
|
|
||||||
_ => false,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
_ => return false,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Получаем текущее значение столбца
|
|
||||||
if let Some(value) = self.get_visible_value(column_name, tx_id) {
|
|
||||||
match &clause.operator {
|
|
||||||
crate::parser::sql::Operator::Eq => value == *clause_value,
|
|
||||||
crate::parser::sql::Operator::Ne => value != *clause_value,
|
|
||||||
crate::parser::sql::Operator::Gt => Self::value_gt(&value, clause_value),
|
|
||||||
crate::parser::sql::Operator::Lt => Self::value_lt(&value, clause_value),
|
|
||||||
crate::parser::sql::Operator::Ge => value == *clause_value || Self::value_gt(&value, clause_value),
|
|
||||||
crate::parser::sql::Operator::Le => value == *clause_value || Self::value_lt(&value, clause_value),
|
|
||||||
crate::parser::sql::Operator::Like => {
|
|
||||||
if let (crate::parser::sql::Value::Text(s), crate::parser::sql::Value::Text(pattern)) = (&value, clause_value) {
|
|
||||||
pattern == "%" || s.contains(pattern.trim_matches('%'))
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => false, // Другие операторы пока не поддерживаются
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Сравнение значений (больше)
|
|
||||||
fn value_gt(v1: &crate::parser::sql::Value, v2: &crate::parser::sql::Value) -> bool {
|
|
||||||
match (v1, v2) {
|
|
||||||
(crate::parser::sql::Value::Integer(a), crate::parser::sql::Value::Integer(b)) => a > b,
|
|
||||||
(crate::parser::sql::Value::Float(a), crate::parser::sql::Value::Float(b)) => a > b,
|
|
||||||
(crate::parser::sql::Value::Text(a), crate::parser::sql::Value::Text(b)) => a > b,
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Сравнение значений (меньше)
|
|
||||||
fn value_lt(v1: &crate::parser::sql::Value, v2: &crate::parser::sql::Value) -> bool {
|
|
||||||
match (v1, v2) {
|
|
||||||
(crate::parser::sql::Value::Integer(a), crate::parser::sql::Value::Integer(b)) => a < b,
|
|
||||||
(crate::parser::sql::Value::Float(a), crate::parser::sql::Value::Float(b)) => a < b,
|
|
||||||
(crate::parser::sql::Value::Text(a), crate::parser::sql::Value::Text(b)) => a < b,
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Получение снимка данных на момент транзакции
|
|
||||||
pub fn get_snapshot(&self, tx_id: u64) -> HashMap<String, crate::parser::sql::Value> {
|
|
||||||
let mut snapshot = HashMap::new();
|
|
||||||
|
|
||||||
for entry in self.data.iter() {
|
|
||||||
let column = entry.key();
|
|
||||||
if let Some(value) = self.get_visible_value(column, tx_id) {
|
|
||||||
snapshot.insert(column.clone(), value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshot
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Очистка старых версий
|
|
||||||
pub fn vacuum(&self, min_tx_id: u64) -> usize {
|
|
||||||
let mut removed = 0;
|
|
||||||
|
|
||||||
for mut entry in self.data.iter_mut() {
|
|
||||||
let versions = entry.value_mut();
|
|
||||||
let original_len = versions.len();
|
|
||||||
|
|
||||||
// Удаляем версии, которые были созданы до min_tx_id и устарели
|
|
||||||
versions.retain(|v| {
|
|
||||||
if v.created_tx >= min_tx_id {
|
|
||||||
true
|
|
||||||
} else if let Some(expired_tx) = v.expired_tx {
|
|
||||||
expired_tx >= min_tx_id
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
removed += original_len - versions.len();
|
|
||||||
}
|
|
||||||
|
|
||||||
removed
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Ошибки MVCC
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
|
||||||
pub enum MvccError {
|
|
||||||
#[error("Transaction conflict: {0}")]
|
|
||||||
TransactionConflict(String),
|
|
||||||
|
|
||||||
#[error("Transaction not found: {0}")]
|
|
||||||
TransactionNotFound(u64),
|
|
||||||
|
|
||||||
#[error("Serialization failure: {0}")]
|
|
||||||
SerializationFailure(String),
|
|
||||||
}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user