Upload files to "src"

This commit is contained in:
Григорий Сафронов 2025-09-22 20:13:31 +00:00
parent 2f84cde99b
commit 423c5dec37

787
src/db.rs Normal file
View File

@ -0,0 +1,787 @@
use ansi_term::Colour;
use rmp_serde::{Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
use std::fs::{self, File, OpenOptions};
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use parking_lot::RwLock;
use tokio::sync::mpsc;
use tokio::time::interval;
use uuid::Uuid;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Document {
pub id: String,
pub data: HashMap<String, serde_json::Value>,
pub version: u64,
}
#[derive(Debug)]
pub struct Index {
primary: RwLock<BTreeMap<String, String>>, // key -> document_id
secondary: RwLock<HashMap<String, Vec<String>>>, // field_value -> document_ids
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Transaction {
pub id: String,
pub operations: Vec<Operation>,
pub committed: bool,
pub created_at: u64,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum Operation {
Insert { doc: Document },
Update { doc_id: String, data: HashMap<String, serde_json::Value> },
Delete { doc_id: String },
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AclRule {
pub user: String,
pub permission: Permission,
pub collections: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub enum Permission {
ReadOnly,
ReadWrite,
Admin,
}
#[derive(Debug)]
pub struct Shard {
pub data: RwLock<HashMap<String, Document>>,
pub index: Index,
pub wal: Option<Wal>,
}
#[derive(Debug)]
pub struct Wal {
file: std::sync::Mutex<File>,
sequence: AtomicU64,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct NodeInfo {
pub id: String,
pub address: String,
pub port: u16,
pub status: NodeStatus,
pub last_seen: u64,
pub version: u64,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub enum NodeStatus {
Alive,
Suspect,
Dead,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GossipMessage {
pub from_node: String,
pub nodes: Vec<NodeInfo>,
pub sequence: u64,
pub timestamp: u64,
}
#[derive(Debug)]
pub struct GossipManager {
pub node_id: String,
pub nodes: RwLock<HashMap<String, NodeInfo>>,
pub sequence: AtomicU64,
pub tx: mpsc::Sender<GossipMessage>,
pub rx: mpsc::Receiver<GossipMessage>,
pub config: DatabaseConfig,
}
#[derive(Debug)]
pub struct FutriixDB {
pub shards: Vec<Arc<Shard>>,
pub config: DatabaseConfig,
pub transactions: RwLock<HashMap<String, Transaction>>,
pub lua_modules: RwLock<HashMap<String, String>>, // name -> lua code
pub gossip: Arc<GossipManager>,
pub acl_rules: RwLock<Vec<AclRule>>,
pub backups: RwLock<HashMap<String, BackupInfo>>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct BackupInfo {
pub id: String,
pub timestamp: u64,
pub size: u64,
pub collections: Vec<String>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct DatabaseConfig {
pub data_dir: String,
pub wal_enabled: bool,
pub wal_dir: String,
pub backup_dir: String,
pub replication_enabled: bool,
pub replication_nodes: Vec<String>,
pub sharding_enabled: bool,
pub shards: usize,
pub cluster_mode: bool,
pub gossip_port: u16,
pub gossip_interval_ms: u64,
pub node_id: Option<String>,
pub acl_enabled: bool,
}
impl FutriixDB {
pub fn new(config: &DatabaseConfig) -> Result<Self, Box<dyn std::error::Error>> {
// Создаем директории если не существуют
fs::create_dir_all(&config.data_dir)?;
if config.wal_enabled {
fs::create_dir_all(&config.wal_dir)?;
}
fs::create_dir_all(&config.backup_dir)?;
let mut shards = Vec::new();
for i in 0..config.shards {
let shard = Arc::new(Shard::new(config, i)?);
shards.push(shard);
}
// Инициализируем GOSSIP менеджер
let gossip = GossipManager::new(config)?;
// Инициализируем ACL правила по умолчанию
let acl_rules = RwLock::new(Vec::new());
Ok(Self {
shards,
config: config.clone(),
transactions: RwLock::new(HashMap::new()),
lua_modules: RwLock::new(HashMap::new()),
gossip: Arc::new(gossip),
acl_rules,
backups: RwLock::new(HashMap::new()),
})
}
pub fn insert(&self, key: &str, data: HashMap<String, serde_json::Value>) -> Result<(), String> {
let shard = self.get_shard(key);
let mut shard_data = shard.data.write();
let document = Document {
id: key.to_string(),
data,
version: 0,
};
if shard_data.contains_key(key) {
return Err("Key already exists".to_string());
}
// Добавляем в WAL если включен
if let Some(wal) = &shard.wal {
if let Err(e) = wal.write_operation(&Operation::Insert { doc: document.clone() }) {
return Err(format!("WAL write error: {}", e));
}
}
shard_data.insert(key.to_string(), document);
Ok(())
}
pub fn get(&self, key: &str) -> Result<Document, String> {
let shard = self.get_shard(key);
let shard_data = shard.data.read();
shard_data.get(key)
.cloned()
.ok_or_else(|| "Key not found".to_string())
}
pub fn update(&self, key: &str, data: HashMap<String, serde_json::Value>) -> Result<(), String> {
let shard = self.get_shard(key);
let mut shard_data = shard.data.write();
let doc = shard_data.get_mut(key)
.ok_or_else(|| "Key not found".to_string())?;
doc.data = data;
doc.version += 1;
if let Some(wal) = &shard.wal {
wal.write_operation(&Operation::Update {
doc_id: key.to_string(),
data: doc.data.clone()
})?;
}
Ok(())
}
pub fn delete(&self, key: &str) -> Result<(), String> {
let shard = self.get_shard(key);
let mut shard_data = shard.data.write();
if !shard_data.contains_key(key) {
return Err("Key not found".to_string());
}
if let Some(wal) = &shard.wal {
wal.write_operation(&Operation::Delete {
doc_id: key.to_string()
})?;
}
shard_data.remove(key);
Ok(())
}
fn get_shard(&self, key: &str) -> &Arc<Shard> {
let hash = self.hash_key(key);
&self.shards[hash % self.shards.len()]
}
fn hash_key(&self, key: &str) -> usize {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
hasher.finish() as usize
}
pub fn add_lua_module(&self, name: &str, code: &str) {
self.lua_modules.write().insert(name.to_string(), code.to_string());
}
pub async fn start_gossip(&self) -> Result<(), Box<dyn std::error::Error>> {
if self.config.cluster_mode {
self.gossip.start().await?;
}
Ok(())
}
// Команды для управления кластером
pub fn gossip_add_node(&self, address: &str, port: u16) -> Result<(), String> {
if !self.config.cluster_mode {
return Err("Cluster mode is disabled".to_string());
}
let node_id = format!("{}:{}", address, port);
let node_info = NodeInfo {
id: node_id.clone(),
address: address.to_string(),
port,
status: NodeStatus::Alive,
last_seen: chrono::Utc::now().timestamp_millis() as u64,
version: 0,
};
self.gossip.update_node(node_info);
println!("GOSSIP: Node {}:{} added to cluster", address, port);
Ok(())
}
pub fn gossip_remove_node(&self, node_id: &str) -> Result<(), String> {
if !self.config.cluster_mode {
return Err("Cluster mode is disabled".to_string());
}
let mut nodes = self.gossip.nodes.write();
if nodes.remove(node_id).is_some() {
println!("GOSSIP: Node {} removed from cluster", node_id);
Ok(())
} else {
Err(format!("Node {} not found", node_id))
}
}
pub fn gossip_list_nodes(&self) -> Result<Vec<NodeInfo>, String> {
if !self.config.cluster_mode {
return Err("Cluster mode is disabled".to_string());
}
Ok(self.gossip.get_nodes())
}
pub fn gossip_reconfigure(&self, new_nodes: Vec<(String, u16)>) -> Result<(), String> {
if !self.config.cluster_mode {
return Err("Cluster mode is disabled".to_string());
}
let nodes_count = new_nodes.len();
let mut nodes = self.gossip.nodes.write();
nodes.clear();
for (address, port) in new_nodes {
let node_id = format!("{}:{}", address, port);
let node_info = NodeInfo {
id: node_id.clone(),
address: address.to_string(),
port,
status: NodeStatus::Alive,
last_seen: chrono::Utc::now().timestamp_millis() as u64,
version: 0,
};
nodes.insert(node_id, node_info);
}
println!("GOSSIP: Cluster reconfigured with {} nodes", nodes_count);
Ok(())
}
// Команды для управления транзакциями
pub fn begin_transaction(&self) -> String {
let tx_id = Uuid::new_v4().to_string();
let transaction = Transaction {
id: tx_id.clone(),
operations: Vec::new(),
committed: false,
created_at: chrono::Utc::now().timestamp_millis() as u64,
};
self.transactions.write().insert(tx_id.clone(), transaction);
println!("TRANSACTION: Started transaction {}", tx_id);
tx_id
}
pub fn commit_transaction(&self, tx_id: &str) -> Result<(), String> {
let mut transactions = self.transactions.write();
if let Some(mut transaction) = transactions.remove(tx_id) {
transaction.committed = true;
// Применяем операции транзакции
for op in &transaction.operations {
match op {
Operation::Insert { doc } => {
if let Err(e) = self.insert(&doc.id, doc.data.clone()) {
return Err(format!("Failed to commit transaction: {}", e));
}
}
Operation::Update { doc_id, data } => {
if let Err(e) = self.update(doc_id, data.clone()) {
return Err(format!("Failed to commit transaction: {}", e));
}
}
Operation::Delete { doc_id } => {
if let Err(e) = self.delete(doc_id) {
return Err(format!("Failed to commit transaction: {}", e));
}
}
}
}
println!("TRANSACTION: Committed transaction {}", tx_id);
Ok(())
} else {
Err(format!("Transaction {} not found", tx_id))
}
}
pub fn rollback_transaction(&self, tx_id: &str) -> Result<(), String> {
let mut transactions = self.transactions.write();
if transactions.remove(tx_id).is_some() {
println!("TRANSACTION: Rolled back transaction {}", tx_id);
Ok(())
} else {
Err(format!("Transaction {} not found", tx_id))
}
}
pub fn add_operation_to_transaction(&self, tx_id: &str, operation: Operation) -> Result<(), String> {
let mut transactions = self.transactions.write();
if let Some(transaction) = transactions.get_mut(tx_id) {
transaction.operations.push(operation);
Ok(())
} else {
Err(format!("Transaction {} not found", tx_id))
}
}
// Команды для управления индексами
pub fn create_secondary_index(&self, index_name: &str, field: &str) -> Result<(), String> {
for shard in &self.shards {
let mut index = shard.index.secondary.write();
index.insert(index_name.to_string(), Vec::new());
}
println!("INDEX: Created secondary index '{}' for field '{}'", index_name, field);
Ok(())
}
pub fn drop_secondary_index(&self, index_name: &str) -> Result<(), String> {
for shard in &self.shards {
let mut index = shard.index.secondary.write();
index.remove(index_name);
}
println!("INDEX: Dropped secondary index '{}'", index_name);
Ok(())
}
pub fn list_indexes(&self) -> Vec<String> {
let shard = &self.shards[0];
let index = shard.index.secondary.read();
index.keys().cloned().collect()
}
// Команды для управления резервными копиями
pub fn create_backup(&self, backup_id: &str) -> Result<(), String> {
let backup_path = PathBuf::from(&self.config.backup_dir).join(format!("{}.backup", backup_id));
let mut file = OpenOptions::new()
.create(true)
.write(true)
.open(&backup_path)
.map_err(|e| format!("Failed to create backup file: {}", e))?;
// Собираем все данные для резервной копии
let mut backup_data = HashMap::new();
for (i, shard) in self.shards.iter().enumerate() {
let shard_data = shard.data.read();
let mut shard_backup = HashMap::new();
for (key, doc) in shard_data.iter() {
shard_backup.insert(key.clone(), doc.clone());
}
backup_data.insert(format!("shard_{}", i), shard_backup);
}
// Сериализуем и записываем данные
let serialized = rmp_serde::to_vec(&backup_data)
.map_err(|e| format!("Failed to serialize backup: {}", e))?;
file.write_all(&serialized)
.map_err(|e| format!("Failed to write backup: {}", e))?;
let backup_info = BackupInfo {
id: backup_id.to_string(),
timestamp: chrono::Utc::now().timestamp_millis() as u64,
size: serialized.len() as u64,
collections: vec!["all".to_string()],
};
self.backups.write().insert(backup_id.to_string(), backup_info);
println!("BACKUP: Created backup '{}'", backup_id);
Ok(())
}
pub fn restore_backup(&self, backup_id: &str) -> Result<(), String> {
let backup_path = PathBuf::from(&self.config.backup_dir).join(format!("{}.backup", backup_id));
let mut file = File::open(&backup_path)
.map_err(|e| format!("Failed to open backup file: {}", e))?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)
.map_err(|e| format!("Failed to read backup: {}", e))?;
let backup_data: HashMap<String, HashMap<String, Document>> = rmp_serde::from_slice(&buffer)
.map_err(|e| format!("Failed to deserialize backup: {}", e))?;
// Восстанавливаем данные
for (i, shard) in self.shards.iter().enumerate() {
let shard_key = format!("shard_{}", i);
if let Some(shard_backup) = backup_data.get(&shard_key) {
let mut shard_data = shard.data.write();
shard_data.clear();
for (key, doc) in shard_backup {
shard_data.insert(key.clone(), doc.clone());
}
}
}
println!("BACKUP: Restored from backup '{}'", backup_id);
Ok(())
}
pub fn delete_backup(&self, backup_id: &str) -> Result<(), String> {
let backup_path = PathBuf::from(&self.config.backup_dir).join(format!("{}.backup", backup_id));
fs::remove_file(&backup_path)
.map_err(|e| format!("Failed to delete backup file: {}", e))?;
self.backups.write().remove(backup_id);
println!("BACKUP: Deleted backup '{}'", backup_id);
Ok(())
}
pub fn list_backups(&self) -> Vec<BackupInfo> {
self.backups.read().values().cloned().collect()
}
// Команды для управления ACL
pub fn add_acl_rule(&self, user: &str, permission: Permission, collections: Vec<String>) -> Result<(), String> {
if !self.config.acl_enabled {
return Err("ACL is disabled".to_string());
}
let rule = AclRule {
user: user.to_string(),
permission,
collections,
};
let mut acl_rules = self.acl_rules.write();
acl_rules.push(rule);
println!("ACL: Added rule for user '{}'", user);
Ok(())
}
pub fn remove_acl_rule(&self, user: &str) -> Result<(), String> {
if !self.config.acl_enabled {
return Err("ACL is disabled".to_string());
}
let mut acl_rules = self.acl_rules.write();
let initial_len = acl_rules.len();
acl_rules.retain(|rule| rule.user != user);
if acl_rules.len() < initial_len {
println!("ACL: Removed rules for user '{}'", user);
Ok(())
} else {
Err(format!("No ACL rules found for user '{}'", user))
}
}
pub fn list_acl_rules(&self) -> Vec<AclRule> {
self.acl_rules.read().clone()
}
pub fn check_permission(&self, user: &str, collection: &str, operation: &str) -> bool {
if !self.config.acl_enabled {
return true; // Если ACL отключен, разрешаем все
}
let acl_rules = self.acl_rules.read();
for rule in acl_rules.iter() {
if rule.user == user && (rule.collections.contains(&"all".to_string()) || rule.collections.contains(&collection.to_string())) {
match (&rule.permission, operation) {
(Permission::Admin, _) => return true,
(Permission::ReadWrite, "read") | (Permission::ReadWrite, "write") => return true,
(Permission::ReadOnly, "read") => return true,
_ => continue,
}
}
}
false
}
}
impl Shard {
fn new(config: &DatabaseConfig, index: usize) -> Result<Self, Box<dyn std::error::Error>> {
let wal = if config.wal_enabled {
let wal_path = PathBuf::from(&config.wal_dir).join(format!("shard_{}.wal", index));
Some(Wal::new(wal_path)?)
} else {
None
};
Ok(Self {
data: RwLock::new(HashMap::new()),
index: Index::new(),
wal,
})
}
}
impl Index {
fn new() -> Self {
Self {
primary: RwLock::new(BTreeMap::new()),
secondary: RwLock::new(HashMap::new()),
}
}
}
impl Wal {
fn new(path: PathBuf) -> Result<Self, Box<dyn std::error::Error>> {
let file = OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
Ok(Self {
file: std::sync::Mutex::new(file),
sequence: AtomicU64::new(0),
})
}
fn write_operation(&self, operation: &Operation) -> Result<(), String> {
let mut buf = Vec::new();
let mut serializer = Serializer::new(&mut buf);
operation.serialize(&mut serializer)
.map_err(|e| format!("Serialization error: {}", e))?;
let mut file = self.file.lock().unwrap();
file.write_all(&buf)
.map_err(|e| format!("Write error: {}", e))?;
file.write_all(b"\n")
.map_err(|e| format!("Write error: {}", e))?;
self.sequence.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
impl GossipManager {
fn new(config: &DatabaseConfig) -> Result<Self, Box<dyn std::error::Error>> {
let (tx, rx) = mpsc::channel(100);
// Используем node_id из конфигурации или генерируем случайный
let node_id = if let Some(configured_id) = &config.node_id {
configured_id.clone()
} else {
format!("node-{}", Uuid::new_v4().to_string()[..8].to_string())
};
println!("GOSSIP: Initializing with node ID: {}", node_id);
println!("GOSSIP: Cluster mode: {}", config.cluster_mode);
println!("GOSSIP: Port: {}", config.gossip_port);
println!("GOSSIP: Interval: {}ms", config.gossip_interval_ms);
Ok(Self {
node_id,
nodes: RwLock::new(HashMap::new()),
sequence: AtomicU64::new(0),
tx,
rx,
config: config.clone(),
})
}
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
if !self.config.cluster_mode {
return Ok(());
}
println!("GOSSIP: Starting gossip loop with {}ms interval", self.config.gossip_interval_ms);
let gossip_clone = self.clone_manager();
tokio::spawn(async move {
gossip_clone.run_gossip_loop().await;
});
println!("Cluster: started successfully");
Ok(())
}
async fn run_gossip_loop(&self) {
if !self.config.cluster_mode {
return;
}
let mut interval = interval(Duration::from_millis(self.config.gossip_interval_ms));
let mut broadcast_count = 0;
loop {
interval.tick().await;
// Ограничиваем частоту вывода сообщений в консоль
if broadcast_count % 10 == 0 {
let nodes_count = self.nodes.read().len();
if nodes_count > 0 {
println!("GOSSIP: Broadcasting to {} nodes", nodes_count);
}
}
self.broadcast_gossip().await;
self.cleanup_dead_nodes();
broadcast_count += 1;
}
}
async fn broadcast_gossip(&self) {
let nodes = self.nodes.read().clone();
let message = GossipMessage {
from_node: self.node_id.clone(),
nodes: nodes.values().cloned().collect(),
sequence: self.sequence.fetch_add(1, Ordering::SeqCst),
timestamp: chrono::Utc::now().timestamp_millis() as u64,
};
// Здесь будет реализация рассылки сообщений другим узлам
// через сетевые сокеты или другие транспортные механизмы
}
fn cleanup_dead_nodes(&self) {
let mut nodes = self.nodes.write();
let now = chrono::Utc::now().timestamp_millis() as u64;
let mut removed_count = 0;
nodes.retain(|_, node| {
if node.status == NodeStatus::Dead {
removed_count += 1;
return false;
}
// Помечаем узлы как подозрительные если не видели их более 30 секунд
if now - node.last_seen > 30000 {
node.status = NodeStatus::Suspect;
println!("GOSSIP: Node {} marked as suspect", node.id);
}
// Помечаем узлы как мертвые если не видели их более 60 секунд
if now - node.last_seen > 60000 {
node.status = NodeStatus::Dead;
println!("GOSSIP: Node {} marked as dead", node.id);
removed_count += 1;
return false;
}
true
});
if removed_count > 0 {
println!("GOSSIP: Removed {} dead nodes", removed_count);
}
}
pub fn update_node(&self, node_info: NodeInfo) {
let node_id = node_info.id.clone();
let mut nodes = self.nodes.write();
nodes.insert(node_info.id.clone(), node_info);
println!("GOSSIP: Node {} updated", node_id);
}
pub fn get_nodes(&self) -> Vec<NodeInfo> {
let nodes = self.nodes.read();
nodes.values().cloned().collect()
}
fn clone_manager(&self) -> Arc<Self> {
// Создаем новый канал для клонированного менеджера
let (tx, rx) = mpsc::channel(100);
Arc::new(Self {
node_id: self.node_id.clone(),
nodes: RwLock::new(self.nodes.read().clone()),
sequence: AtomicU64::new(self.sequence.load(Ordering::SeqCst)),
tx,
rx,
config: self.config.clone(),
})
}
}
// Реализация ручной клонирования для GossipManager
impl Clone for GossipManager {
fn clone(&self) -> Self {
// Создаем новый канал для клонированного менеджера
let (tx, rx) = mpsc::channel(100);
Self {
node_id: self.node_id.clone(),
nodes: RwLock::new(self.nodes.read().clone()),
sequence: AtomicU64::new(self.sequence.load(Ordering::SeqCst)),
tx,
rx,
config: self.config.clone(),
}
}
}