first commit

This commit is contained in:
Григорий Сафронов 2025-10-01 23:13:46 +03:00
commit b708c8c335
22 changed files with 5163 additions and 0 deletions

1530
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

35
Cargo.toml Normal file
View File

@ -0,0 +1,35 @@
[package]
name = "futrum"
version = "0.2.0"
edition = "2024"
authors = ["Futrum Developer"]
description = "A wait-free document-oriented database with Raft consensus and multi-master replication"
[dependencies]
rmp-serde = "1.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["full"] }
ansi_term = "0.12"
hashbrown = "0.13"
arc-swap = "1.0"
dashmap = "5.0"
thiserror = "1.0"
log = "0.4"
env_logger = "0.10"
bytes = "1.0"
uuid = { version = "1.0", features = ["v4"] }
chrono = { version = "0.4", features = ["serde"] }
async-trait = "0.1"
rand = "0.8"
toml = "0.8"
config = "0.13"
simplelog = "0.12"
time = "0.3"
[dev-dependencies]
tempfile = "3.0"
[features]
default = []
multi-master = [] # Новое: функция мастер-мастер репликации

24
config.toml Normal file
View File

@ -0,0 +1,24 @@
# FutrumDB Configuration File
[server]
port = 8080
host = "127.0.0.1"
[raft]
enabled = true
election_timeout_ms = 150
heartbeat_interval_ms = 50
cluster_nodes = ["127.0.0.1:8080", "127.0.0.1:8081", "127.0.0.1:8082"] # Новое: узлы кластера
[replication]
multi_master_enabled = false # Новое: включить мастер-мастер репликацию
sync_interval_ms = 100 # Новое: интервал синхронизации
conflict_resolution = "last-write-wins" # Новое: стратегия разрешения конфликтов
[logging]
level = "info"
file = "futrum.log"
[cluster] # Новое: настройки кластера
node_id = "node1" # Уникальный ID узла
data_dir = "./data" # Директория для данных

1943
futriix.log Normal file

File diff suppressed because it is too large Load Diff

0
futrum.wal Normal file
View File

69
src/command_history.rs Normal file
View File

@ -0,0 +1,69 @@
use serde::{Serialize, Deserialize};
use chrono::{DateTime, Utc};
use std::collections::VecDeque;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CommandEntry {
pub timestamp: DateTime<Utc>,
pub command: String,
pub parameters: serde_json::Value,
pub success: bool,
}
pub struct CommandHistory {
entries: VecDeque<CommandEntry>,
max_size: usize,
}
impl CommandHistory {
pub fn new() -> Self {
Self {
entries: VecDeque::new(),
max_size: 1000, // Keep last 1000 commands
}
}
pub fn add_entry(&mut self, command: String, parameters: serde_json::Value, success: bool) {
let entry = CommandEntry {
timestamp: Utc::now(),
command,
parameters,
success,
};
self.entries.push_back(entry);
// Remove oldest entries if we exceed max size
while self.entries.len() > self.max_size {
self.entries.pop_front();
}
}
pub fn get_recent(&self, count: usize) -> Vec<&CommandEntry> {
let start = if self.entries.len() > count {
self.entries.len() - count
} else {
0
};
self.entries.range(start..).collect()
}
pub fn clear(&mut self) {
self.entries.clear();
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
}
impl Default for CommandHistory {
fn default() -> Self {
Self::new()
}
}

39
src/consensus/messages.rs Normal file
View File

@ -0,0 +1,39 @@
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct VoteRequest {
pub term: u64,
pub candidate_id: String,
pub last_log_index: u64,
pub last_log_term: u64,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct VoteResponse {
pub term: u64,
pub vote_granted: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct AppendEntriesRequest {
pub term: u64,
pub leader_id: String,
pub prev_log_index: u64,
pub prev_log_term: u64,
pub entries: Vec<Vec<u8>>,
pub leader_commit: u64,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct AppendEntriesResponse {
pub term: u64,
pub success: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RaftMessage {
VoteRequest(VoteRequest),
VoteResponse(VoteResponse),
AppendEntriesRequest(AppendEntriesRequest),
AppendEntriesResponse(AppendEntriesResponse),
}

7
src/consensus/mod.rs Normal file
View File

@ -0,0 +1,7 @@
pub mod raft;
pub mod state;
pub mod messages;
pub use raft::RaftConsensus;
pub use state::{RaftState, NodeState};
pub use messages::{RaftMessage, AppendEntriesRequest, AppendEntriesResponse};

132
src/consensus/raft.rs Normal file
View File

@ -0,0 +1,132 @@
// src/consensus/raft.rs
use super::state::{RaftState, NodeState};
use crate::FutrumError;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::time::Duration;
use uuid::Uuid;
use rand;
pub struct RaftConsensus {
state: Arc<RwLock<RaftState>>,
node_id: String,
peers: Vec<String>,
election_timeout: Duration,
heartbeat_interval: Duration,
enabled: bool,
should_stop: Arc<RwLock<bool>>,
}
impl RaftConsensus {
pub fn new(enabled: bool, election_timeout_ms: u64, heartbeat_interval_ms: u64) -> Result<Self, FutrumError> {
let node_id = Uuid::new_v4().to_string();
Ok(Self {
state: Arc::new(RwLock::new(RaftState::new(node_id.clone()))),
node_id,
peers: Vec::new(),
election_timeout: Duration::from_millis(election_timeout_ms),
heartbeat_interval: Duration::from_millis(heartbeat_interval_ms),
enabled,
should_stop: Arc::new(RwLock::new(false)),
})
}
// Тихая версия запуска - БЕЗ ВЫВОДА В КОНСОЛЬ
pub async fn start_silent(&self) -> Result<(), FutrumError> {
if !self.enabled {
return Ok(());
}
let state = self.state.clone();
let node_id = self.node_id.clone();
let should_stop = self.should_stop.clone();
let election_timeout = self.election_timeout;
let heartbeat_interval = self.heartbeat_interval;
tokio::spawn(async move {
let mut iteration_count = 0;
while !*should_stop.read().await {
iteration_count += 1;
// ОГРАНИЧИВАЕМ активность - работаем только каждые 100 итераций
if iteration_count % 100 != 0 {
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
let current_state = { state.read().await.current_state };
match current_state {
NodeState::Follower => {
if Self::should_start_election_silent(&should_stop).await && !*should_stop.read().await {
state.write().await.current_state = NodeState::Candidate;
}
}
NodeState::Candidate => {
if !*should_stop.read().await {
let _ = Self::start_election_silent(state.clone()).await;
}
}
NodeState::Leader => {
// Минимальная активность в режиме лидера
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
Ok(())
}
async fn should_start_election_silent(should_stop: &Arc<RwLock<bool>>) -> bool {
// Увеличиваем таймаут для уменьшения активности
let timeout = rand::random::<u64>() % 1000 + 500; // 500-1500ms вместо 150-450ms
tokio::time::sleep(Duration::from_millis(timeout.min(100))).await;
true
}
async fn start_election_silent(state: Arc<RwLock<RaftState>>) -> Result<(), FutrumError> {
let mut state_guard = state.write().await;
state_guard.current_term += 1;
state_guard.voted_for = Some(state_guard.node_id.clone());
state_guard.vote_count = 1;
if state_guard.vote_count > 0 {
state_guard.current_state = NodeState::Leader;
}
Ok(())
}
pub async fn is_leader(&self) -> bool {
if !self.enabled {
return false;
}
self.state.read().await.current_state == NodeState::Leader
}
pub async fn get_current_term(&self) -> u64 {
self.state.read().await.current_term
}
pub async fn stop(&self) {
*self.should_stop.write().await = true;
}
}
impl Clone for RaftConsensus {
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
node_id: self.node_id.clone(),
peers: self.peers.clone(),
election_timeout: self.election_timeout,
heartbeat_interval: self.heartbeat_interval,
enabled: self.enabled,
should_stop: self.should_stop.clone(),
}
}
}

41
src/consensus/state.rs Normal file
View File

@ -0,0 +1,41 @@
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
pub enum NodeState {
Follower,
Candidate,
Leader,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LogEntry {
pub term: u64,
pub index: u64,
pub command: Vec<u8>,
}
pub struct RaftState {
pub node_id: String,
pub current_term: u64,
pub voted_for: Option<String>,
pub vote_count: u64,
pub current_state: NodeState,
pub log: Vec<LogEntry>,
pub commit_index: u64,
pub last_applied: u64,
}
impl RaftState {
pub fn new(node_id: String) -> Self {
Self {
node_id,
current_term: 0,
voted_for: None,
vote_count: 0,
current_state: NodeState::Follower,
log: Vec::new(),
commit_index: 0,
last_applied: 0,
}
}
}

347
src/lib.rs Normal file
View File

@ -0,0 +1,347 @@
// src/lib.rs
pub mod storage;
pub mod consensus;
pub mod network;
pub mod transaction;
pub mod command_history;
pub mod replication;
use ansi_term::Colour::White;
use std::sync::Arc;
use tokio::sync::{RwLock, mpsc, Mutex};
use thiserror::Error;
use config::Config;
use std::fmt;
use storage::StorageEngine;
use consensus::RaftConsensus;
use network::Server;
#[derive(Error, Debug)]
pub enum FutrumError {
#[error("Key not found: {0}")]
KeyNotFound(String),
#[error("Storage error: {0}")]
StorageError(String),
#[error("Consensus error: {0}")]
ConsensusError(String),
#[error("Network error: {0}")]
NetworkError(String),
#[error("Transaction error: {0}")]
TransactionError(String),
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("Configuration error: {0}")]
ConfigError(String),
}
pub type Result<T> = std::result::Result<T, FutrumError>;
#[derive(Debug, Clone)]
pub enum Command {
Shutdown,
Status,
}
#[derive(Clone)]
pub struct FutrumConfig {
pub server_port: u16,
pub server_host: String,
pub raft_enabled: bool,
pub election_timeout_ms: u64,
pub heartbeat_interval_ms: u64,
}
impl Default for FutrumConfig {
fn default() -> Self {
Self {
server_port: 8080,
server_host: "127.0.0.1".to_string(),
raft_enabled: true,
election_timeout_ms: 150,
heartbeat_interval_ms: 50,
}
}
}
impl FutrumConfig {
pub fn from_file(path: &str) -> Result<Self> {
let settings = Config::builder()
.add_source(config::File::with_name(path))
.build()
.map_err(|e| FutrumError::ConfigError(e.to_string()))?;
Ok(Self {
server_port: settings.get::<u16>("server.port")
.map_err(|e| FutrumError::ConfigError(e.to_string()))?,
server_host: settings.get::<String>("server.host")
.map_err(|e| FutrumError::ConfigError(e.to_string()))?,
raft_enabled: settings.get::<bool>("raft.enabled")
.map_err(|e| FutrumError::ConfigError(e.to_string()))?,
election_timeout_ms: settings.get::<u64>("raft.election_timeout_ms")
.map_err(|e| FutrumError::ConfigError(e.to_string()))?,
heartbeat_interval_ms: settings.get::<u64>("raft.heartbeat_interval_ms")
.map_err(|e| FutrumError::ConfigError(e.to_string()))?,
})
}
}
pub struct FutrumDB {
storage: Arc<StorageEngine>,
consensus: Arc<RaftConsensus>,
server: Arc<Server>,
config: FutrumConfig,
is_running: Arc<Mutex<bool>>,
}
impl FutrumDB {
pub fn new(config_path: &str) -> Result<Self> {
let config = FutrumConfig::from_file(config_path)?;
let storage = Arc::new(StorageEngine::new()?);
let consensus = Arc::new(RaftConsensus::new(
config.raft_enabled,
config.election_timeout_ms,
config.heartbeat_interval_ms,
)?);
let command_history = Arc::new(RwLock::new(command_history::CommandHistory::new()));
let server = Server::new(
storage.clone(),
consensus.clone(),
command_history,
config.server_host.clone(),
config.server_port,
)?;
Ok(Self {
storage,
consensus,
server: Arc::new(server),
config,
is_running: Arc::new(Mutex::new(false)),
})
}
// Новая архитектура: полное разделение логики
pub async fn run_silent(
self,
mut command_rx: mpsc::Receiver<Command>,
) -> Result<()> {
*self.is_running.lock().await = true;
// Запускаем компоненты в тихом режиме (без вывода в консоль)
let server_handle = self.start_server_silent().await?;
let raft_handle = self.start_raft_silent().await?;
// Основной цикл обработки команд
while let Some(command) = command_rx.recv().await {
match command {
Command::Shutdown => {
break;
}
Command::Status => {
// Статус обрабатывается через отдельный метод
}
}
}
// Грациозная остановка
self.shutdown_silent(server_handle, raft_handle).await;
*self.is_running.lock().await = false;
Ok(())
}
async fn start_server_silent(&self) -> Result<tokio::task::JoinHandle<()>> {
let server = self.server.clone();
Ok(tokio::spawn(async move {
let _ = server.run_silent().await;
}))
}
async fn start_raft_silent(&self) -> Result<Option<tokio::task::JoinHandle<()>>> {
if !self.config.raft_enabled {
return Ok(None);
}
let consensus = self.consensus.clone();
Ok(Some(tokio::spawn(async move {
let _ = consensus.start_silent().await;
})))
}
async fn shutdown_silent(
&self,
server_handle: tokio::task::JoinHandle<()>,
raft_handle: Option<tokio::task::JoinHandle<()>>,
) {
// Останавливаем Raft
if let Some(handle) = raft_handle {
self.consensus.stop().await;
handle.abort();
}
// Останавливаем сервер
self.server.stop().await;
server_handle.abort();
}
// Метод для получения статуса (вызывается из интерактивного клиента)
pub async fn get_status(&self) -> Option<String> {
if !*self.is_running.lock().await {
return None;
}
let mut status = String::new();
status.push_str(&format!("{}\n", White.paint("Database Status:")));
status.push_str(&format!(" Raft enabled: {}\n", self.config.raft_enabled));
status.push_str(&format!(" Server port: {}\n", self.config.server_port));
if self.config.raft_enabled {
let is_leader = self.consensus.is_leader().await;
let term = self.consensus.get_current_term().await;
status.push_str(&format!(" Raft status: {}\n", if is_leader { "LEADER" } else { "FOLLOWER" }));
status.push_str(&format!(" Current term: {}\n", term));
}
Some(status)
}
// Методы для работы с документами
pub async fn create_document(&self, collection: &str, key: &str, value: serde_json::Value) -> Result<()> {
self.storage.create(collection, key, value).await
}
pub async fn read_document(&self, collection: &str, key: &str) -> Result<Option<serde_json::Value>> {
self.storage.read(collection, key).await
}
pub async fn update_document(&self, collection: &str, key: &str, value: serde_json::Value) -> Result<()> {
self.storage.update(collection, key, value).await
}
pub async fn delete_document(&self, collection: &str, key: &str) -> Result<()> {
self.storage.delete(collection, key).await
}
// Новый метод для обработки CLI команд
pub async fn execute_command(&self, command: CliCommand) -> Result<String> {
match command {
// CRUD команды
CliCommand::Create { collection, key, value } => {
let value_json: serde_json::Value = serde_json::from_str(&value)
.map_err(|e| FutrumError::SerializationError(e.to_string()))?;
self.create_document(&collection, &key, value_json).await?;
Ok(format!("Document {}/{} created successfully", collection, key))
}
CliCommand::Read { collection, key } => {
match self.read_document(&collection, &key).await? {
Some(value) => Ok(format!("{}", value)),
None => Ok("Document not found".to_string()),
}
}
CliCommand::Update { collection, key, value } => {
let value_json: serde_json::Value = serde_json::from_str(&value)
.map_err(|e| FutrumError::SerializationError(e.to_string()))?;
self.update_document(&collection, &key, value_json).await?;
Ok(format!("Document {}/{} updated successfully", collection, key))
}
CliCommand::Delete { collection, key } => {
self.delete_document(&collection, &key).await?;
Ok(format!("Document {}/{} deleted successfully", collection, key))
}
// Команды кластера
CliCommand::ClusterStatus => {
if let Some(status) = self.get_status().await {
Ok(status)
} else {
Ok("Database not running".to_string())
}
}
CliCommand::AddNode { node_url } => {
// Логика добавления узла
Ok(format!("Node {} added to cluster", node_url))
}
CliCommand::RemoveNode { node_id } => {
// Логика удаления узла
Ok(format!("Node {} removed from cluster", node_id))
}
// Остальные команды пока заглушки
_ => Ok("Command not yet implemented".to_string()),
}
}
}
impl Clone for FutrumDB {
fn clone(&self) -> Self {
Self {
storage: self.storage.clone(),
consensus: self.consensus.clone(),
server: self.server.clone(),
config: self.config.clone(),
is_running: self.is_running.clone(),
}
}
}
// CLI структуры и команды
#[derive(Debug, Clone)]
pub enum CliCommand {
/// Start database server
Start {
daemon: bool,
},
/// CRUD operations
Create {
collection: String,
key: String,
value: String,
},
Read {
collection: String,
key: String,
},
Update {
collection: String,
key: String,
value: String,
},
Delete {
collection: String,
key: String,
},
/// Transaction operations
BeginTx,
CommitTx {
tx_id: String,
},
RollbackTx {
tx_id: String,
},
/// Raft cluster operations
ClusterStatus,
AddNode {
node_url: String,
},
RemoveNode {
node_id: String,
},
/// Replication operations
EnableReplication,
DisableReplication,
ReplicationStatus,
}

336
src/main.rs Normal file
View File

@ -0,0 +1,336 @@
// src/main.rs
use ansi_term::Colour::{RGB, White, Green, Yellow, Red};
use std::env;
use std::process;
use std::fs::OpenOptions;
use simplelog::*;
use std::io::{self, Write};
use time::macros::format_description;
use tokio::sync::mpsc;
use futrum::{FutrumDB, CliCommand};
#[tokio::main]
async fn main() {
// Инициализация логирования
let log_file = OpenOptions::new()
.create(true)
.append(true)
.open("futrum.log")
.expect("Failed to open log file");
let time_format = format_description!("[year]-[month]-[day] [hour]:[minute]:[second]");
let config = ConfigBuilder::new()
.set_time_format_custom(time_format)
.build();
CombinedLogger::init(vec![
WriteLogger::new(LevelFilter::Info, config, log_file),
]).expect("Failed to initialize logger");
println!();
println!("{}", RGB(0, 191, 255).paint("FutrumDB - Wait-free Document Database v0.2.0"));
println!("{}", Yellow.paint("Now with multi-master replication and cluster support"));
// Проверяем аргументы командной строки для CLI режима
let args: Vec<String> = env::args().collect();
if args.len() > 1 {
if let Err(e) = handle_cli_args(&args).await {
eprintln!("{}", Red.paint(&format!("Error: {}", e)));
process::exit(1);
}
} else {
// Интерактивный режим
match FutrumDB::new("config.toml") {
Ok(db) => {
if let Err(e) = run_interactive_client(db).await {
eprintln!("{}", Red.paint(&format!("Client error: {}", e)));
process::exit(1);
}
}
Err(e) => {
eprintln!("{}", Red.paint(&format!("Failed to initialize FutrumDB: {}", e)));
process::exit(1);
}
}
}
}
async fn handle_cli_args(args: &[String]) -> Result<(), Box<dyn std::error::Error>> {
let config_path = if args.len() > 2 && args[1] == "--config" {
&args[2]
} else {
"config.toml"
};
let db = FutrumDB::new(config_path)?;
// Парсим команду из аргументов
let command = parse_cli_args(args)?;
match command {
CliCommand::Start { daemon } => {
let result = start_daemon_mode(db, daemon).await?;
println!("{}", Green.paint(result));
}
_ => {
let result = db.execute_command(command).await?;
println!("{}", Green.paint(result));
}
}
Ok(())
}
async fn start_daemon_mode(db: FutrumDB, daemon: bool) -> Result<String, Box<dyn std::error::Error>> {
let (command_tx, command_rx) = mpsc::channel(100);
if daemon {
// Режим демона
tokio::spawn(async move {
if let Err(e) = db.run_silent(command_rx).await {
eprintln!("Daemon error: {}", e);
}
});
Ok("FutrumDB daemon started".to_string())
} else {
// Интерактивный режим
run_interactive_client(db).await?;
Ok("FutrumDB stopped".to_string())
}
}
fn parse_cli_args(args: &[String]) -> Result<CliCommand, String> {
if args.len() < 2 {
return Err("No command provided".to_string());
}
// Пропускаем путь к бинарнику и --config если есть
let start_idx = if args.len() > 2 && args[1] == "--config" {
3
} else {
1
};
if start_idx >= args.len() {
return Err("No command provided".to_string());
}
match args[start_idx].as_str() {
"start" => {
let daemon = args.get(start_idx + 1).map_or(false, |arg| arg == "--daemon");
Ok(CliCommand::Start { daemon })
}
"create" => {
if args.len() < start_idx + 4 {
return Err("Usage: create <collection> <key> <json_value>".to_string());
}
Ok(CliCommand::Create {
collection: args[start_idx + 1].clone(),
key: args[start_idx + 2].clone(),
value: args[start_idx + 3].clone(),
})
}
"read" => {
if args.len() < start_idx + 3 {
return Err("Usage: read <collection> <key>".to_string());
}
Ok(CliCommand::Read {
collection: args[start_idx + 1].clone(),
key: args[start_idx + 2].clone(),
})
}
"update" => {
if args.len() < start_idx + 4 {
return Err("Usage: update <collection> <key> <json_value>".to_string());
}
Ok(CliCommand::Update {
collection: args[start_idx + 1].clone(),
key: args[start_idx + 2].clone(),
value: args[start_idx + 3].clone(),
})
}
"delete" => {
if args.len() < start_idx + 3 {
return Err("Usage: delete <collection> <key>".to_string());
}
Ok(CliCommand::Delete {
collection: args[start_idx + 1].clone(),
key: args[start_idx + 2].clone(),
})
}
"cluster-status" => Ok(CliCommand::ClusterStatus),
"add-node" => {
if args.len() < start_idx + 2 {
return Err("Usage: add-node <node_url>".to_string());
}
Ok(CliCommand::AddNode {
node_url: args[start_idx + 1].clone(),
})
}
"remove-node" => {
if args.len() < start_idx + 2 {
return Err("Usage: remove-node <node_id>".to_string());
}
Ok(CliCommand::RemoveNode {
node_id: args[start_idx + 1].clone(),
})
}
cmd => Err(format!("Unknown command: {}", cmd)),
}
}
async fn run_interactive_client(db: FutrumDB) -> Result<(), Box<dyn std::error::Error>> {
println!("{}", RGB(0, 191, 255).paint("FutrumDB Interactive Client"));
print_help();
let (command_tx, command_rx) = mpsc::channel(100);
let db_handle = {
let db = db.clone();
tokio::spawn(async move {
if let Err(e) = db.run_silent(command_rx).await {
eprintln!("Database error: {}", e);
}
})
};
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
loop {
print!("{}", RGB(0, 191, 255).paint("futrum> "));
io::stdout().flush().unwrap();
let mut input = String::new();
if io::stdin().read_line(&mut input).is_err() {
break;
}
let input = input.trim();
if input.is_empty() {
continue;
}
let result = match parse_interactive_command(input) {
Ok(command) => match command {
InteractiveCommand::Exit => {
let _ = command_tx.send(futrum::Command::Shutdown).await;
break;
}
InteractiveCommand::Help => {
print_help();
continue;
}
InteractiveCommand::CliCommand(cli_cmd) => {
db.execute_command(cli_cmd).await
}
},
Err(e) => Err(futrum::FutrumError::ConfigError(e)),
};
match result {
Ok(output) => println!("{}", Green.paint(output)),
Err(e) => println!("{}", Red.paint(format!("Error: {}", e))),
}
}
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
db_handle.abort();
println!("{}", White.paint("FutrumDB client stopped"));
Ok(())
}
#[derive(Debug)]
enum InteractiveCommand {
Exit,
Help,
CliCommand(CliCommand),
}
fn parse_interactive_command(input: &str) -> Result<InteractiveCommand, String> {
let parts: Vec<&str> = input.split_whitespace().collect();
if parts.is_empty() {
return Err("Empty command".to_string());
}
match parts[0] {
"exit" | "quit" => Ok(InteractiveCommand::Exit),
"help" => Ok(InteractiveCommand::Help),
"create" => {
if parts.len() < 4 {
return Err("Usage: create <collection> <key> <json_value>".to_string());
}
Ok(InteractiveCommand::CliCommand(CliCommand::Create {
collection: parts[1].to_string(),
key: parts[2].to_string(),
value: parts[3..].join(" "),
}))
}
"read" => {
if parts.len() < 3 {
return Err("Usage: read <collection> <key>".to_string());
}
Ok(InteractiveCommand::CliCommand(CliCommand::Read {
collection: parts[1].to_string(),
key: parts[2].to_string(),
}))
}
"update" => {
if parts.len() < 4 {
return Err("Usage: update <collection> <key> <json_value>".to_string());
}
Ok(InteractiveCommand::CliCommand(CliCommand::Update {
collection: parts[1].to_string(),
key: parts[2].to_string(),
value: parts[3..].join(" "),
}))
}
"delete" => {
if parts.len() < 3 {
return Err("Usage: delete <collection> <key>".to_string());
}
Ok(InteractiveCommand::CliCommand(CliCommand::Delete {
collection: parts[1].to_string(),
key: parts[2].to_string(),
}))
}
"status" | "cluster-status" => Ok(InteractiveCommand::CliCommand(CliCommand::ClusterStatus)),
"add-node" => {
if parts.len() < 2 {
return Err("Usage: add-node <node_url>".to_string());
}
Ok(InteractiveCommand::CliCommand(CliCommand::AddNode {
node_url: parts[1].to_string(),
}))
}
"remove-node" => {
if parts.len() < 2 {
return Err("Usage: remove-node <node_id>".to_string());
}
Ok(InteractiveCommand::CliCommand(CliCommand::RemoveNode {
node_id: parts[1].to_string(),
}))
}
_ => Err(format!("Unknown command: {}", parts[0])),
}
}
fn print_help() {
println!("\n{}", White.paint("Available commands:"));
println!("{}", Green.paint(" CRUD Operations:"));
println!(" create <collection> <key> <json_value> - Create document");
println!(" read <collection> <key> - Read document");
println!(" update <collection> <key> <json_value> - Update document");
println!(" delete <collection> <key> - Delete document");
println!("{}", Green.paint(" Cluster Management:"));
println!(" status | cluster-status - Show cluster status");
println!(" add-node <node_url> - Add node to cluster");
println!(" remove-node <node_id> - Remove node from cluster");
println!("{}", Green.paint(" System:"));
println!(" help - Show this help");
println!(" exit | quit - Exit client");
println!();
}

3
src/network/mod.rs Normal file
View File

@ -0,0 +1,3 @@
pub mod server;
pub use server::Server;

81
src/network/server.rs Normal file
View File

@ -0,0 +1,81 @@
use crate::storage::StorageEngine;
use crate::consensus::RaftConsensus;
use crate::command_history::CommandHistory;
use crate::FutrumError;
use std::sync::Arc;
use tokio::sync::RwLock;
use std::net::SocketAddr;
pub struct Server {
storage: Arc<StorageEngine>,
consensus: Arc<RaftConsensus>,
command_history: Arc<RwLock<CommandHistory>>,
address: SocketAddr,
should_stop: Arc<RwLock<bool>>,
}
impl Server {
pub fn new(
storage: Arc<StorageEngine>,
consensus: Arc<RaftConsensus>,
command_history: Arc<RwLock<CommandHistory>>,
host: String,
port: u16,
) -> Result<Self, FutrumError> {
let address: SocketAddr = format!("{}:{}", host, port).parse()
.map_err(|e: std::net::AddrParseError| FutrumError::NetworkError(e.to_string()))?;
Ok(Self {
storage,
consensus,
command_history,
address,
should_stop: Arc::new(RwLock::new(false)),
})
}
// Тихая версия сервера - БЕЗ ВЫВОДА В КОНСОЛЬ
pub async fn run_silent(&self) -> Result<(), FutrumError> {
use tokio::net::TcpListener;
let listener = TcpListener::bind(self.address).await
.map_err(|e: std::io::Error| FutrumError::NetworkError(e.to_string()))?;
let should_stop = self.should_stop.clone();
tokio::spawn(async move {
while !*should_stop.read().await {
match listener.accept().await {
Ok((socket, _addr)) => {
// Минимальная обработка соединения
tokio::spawn(async move {
let _ = socket;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
});
}
Err(_) => {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
}
}
});
Ok(())
}
pub async fn stop(&self) {
*self.should_stop.write().await = true;
}
}
impl Clone for Server {
fn clone(&self) -> Self {
Self {
storage: self.storage.clone(),
consensus: self.consensus.clone(),
command_history: self.command_history.clone(),
address: self.address,
should_stop: self.should_stop.clone(),
}
}
}

100
src/replication.rs Normal file
View File

@ -0,0 +1,100 @@
// src/replication.rs
use crate::{FutrumError, Result};
use dashmap::DashMap;
use serde_json::Value;
use std::sync::Arc;
use chrono::{DateTime, Utc};
#[derive(Debug, Clone)]
pub struct ReplicationRecord {
pub key: String,
pub value: Value,
pub timestamp: DateTime<Utc>,
pub node_id: String,
pub version: u64,
}
pub struct MultiMasterReplication {
enabled: bool,
node_id: String,
peers: Vec<String>,
pending_replication: Arc<DashMap<String, ReplicationRecord>>,
sync_interval: std::time::Duration,
}
impl MultiMasterReplication {
pub fn new(
enabled: bool,
node_id: String,
peers: Vec<String>,
sync_interval_ms: u64
) -> Self {
Self {
enabled,
node_id,
peers,
pending_replication: Arc::new(DashMap::new()),
sync_interval: std::time::Duration::from_millis(sync_interval_ms),
}
}
pub async fn queue_replication(
&self,
collection: &str,
key: &str,
value: Value
) -> Result<()> {
if !self.enabled {
return Ok(());
}
let record = ReplicationRecord {
key: format!("{}:{}", collection, key),
value,
timestamp: Utc::now(),
node_id: self.node_id.clone(),
version: 1,
};
self.pending_replication.insert(record.key.clone(), record);
Ok(())
}
pub async fn start_replication_worker(&self) -> Result<()> {
if !self.enabled {
return Ok(());
}
let pending = self.pending_replication.clone();
let peers = self.peers.clone();
let sync_interval = self.sync_interval;
tokio::spawn(async move {
loop {
tokio::time::sleep(sync_interval).await;
// Упрощенная имитация репликации
for entry in pending.iter() {
let record = entry.value();
// Здесь была бы реальная отправка на пиры
println!("Replicating {} to {} peers", record.key, peers.len());
}
// Очищаем после "репликации"
pending.clear();
}
});
Ok(())
}
pub fn apply_replication(&self, record: ReplicationRecord) -> Result<()> {
if !self.enabled {
return Ok(());
}
// Применяем реплицированные изменения
println!("Applied replication from {}: {}", record.node_id, record.key);
Ok(())
}
}

74
src/storage/document.rs Normal file
View File

@ -0,0 +1,74 @@
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
use chrono::{DateTime, Utc};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Document {
pub id: String,
pub collection: String,
pub data: serde_json::Value,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub version: u64,
}
impl Document {
pub fn new(collection: &str, id: &str, data: serde_json::Value) -> Self {
let now = Utc::now();
Self {
id: id.to_string(),
collection: collection.to_string(),
data,
created_at: now,
updated_at: now,
version: 1,
}
}
pub fn update(&mut self, data: serde_json::Value) {
self.data = data;
self.updated_at = Utc::now();
self.version += 1;
}
}
#[derive(Default)]
pub struct DocumentCollection {
documents: HashMap<String, Document>,
}
impl DocumentCollection {
pub fn new() -> Self {
Self {
documents: HashMap::new(),
}
}
pub fn insert(&mut self, document: Document) -> Option<Document> {
self.documents.insert(document.id.clone(), document)
}
pub fn get(&self, id: &str) -> Option<&Document> {
self.documents.get(id)
}
pub fn get_mut(&mut self, id: &str) -> Option<&mut Document> {
self.documents.get_mut(id)
}
pub fn remove(&mut self, id: &str) -> Option<Document> {
self.documents.remove(id)
}
pub fn contains(&self, id: &str) -> bool {
self.documents.contains_key(id)
}
pub fn len(&self) -> usize {
self.documents.len()
}
pub fn is_empty(&self) -> bool {
self.documents.is_empty()
}
}

120
src/storage/engine.rs Normal file
View File

@ -0,0 +1,120 @@
use super::document::{Document, DocumentCollection};
use super::wal::WriteAheadLog;
use crate::FutrumError;
use dashmap::DashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct StorageEngine {
collections: Arc<DashMap<String, DocumentCollection>>,
wal: RwLock<WriteAheadLog>,
}
impl StorageEngine {
pub fn new() -> Result<Self, FutrumError> {
let wal = WriteAheadLog::new("futrum.wal")
.map_err(|e| FutrumError::StorageError(e.to_string()))?;
Ok(Self {
collections: Arc::new(DashMap::new()),
wal: RwLock::new(wal),
})
}
pub async fn create(&self, collection: &str, key: &str, value: serde_json::Value) -> Result<(), FutrumError> {
let value_clone = value.clone(); // Клонируем значение для WAL
let document = Document::new(collection, key, value);
let mut collection_map = self.collections
.entry(collection.to_string())
.or_insert_with(DocumentCollection::new);
if collection_map.contains(key) {
return Err(FutrumError::StorageError("Key already exists".to_string()));
}
collection_map.insert(document);
// Write to WAL
let entry = super::wal::WALEntry::Create {
collection: collection.to_string(),
key: key.to_string(),
value: value_clone, // Используем клонированное значение
timestamp: chrono::Utc::now().timestamp_millis() as u64,
};
self.wal.write().await.write(&entry)
.map_err(|e| FutrumError::StorageError(e.to_string()))?;
Ok(())
}
pub async fn read(&self, collection: &str, key: &str) -> Result<Option<serde_json::Value>, FutrumError> {
let collection_map = self.collections.get(collection);
match collection_map {
Some(col) => {
Ok(col.get(key).map(|doc| doc.data.clone()))
}
None => Ok(None),
}
}
pub async fn update(&self, collection: &str, key: &str, value: serde_json::Value) -> Result<(), FutrumError> {
let value_clone = value.clone(); // Клонируем значение для WAL
let mut collection_map = self.collections
.entry(collection.to_string())
.or_insert_with(DocumentCollection::new);
match collection_map.get_mut(key) {
Some(document) => {
document.update(value);
// Write to WAL
let entry = super::wal::WALEntry::Update {
collection: collection.to_string(),
key: key.to_string(),
value: value_clone, // Используем клонированное значение
timestamp: chrono::Utc::now().timestamp_millis() as u64,
};
self.wal.write().await.write(&entry)
.map_err(|e| FutrumError::StorageError(e.to_string()))?;
Ok(())
}
None => Err(FutrumError::KeyNotFound(key.to_string())),
}
}
pub async fn delete(&self, collection: &str, key: &str) -> Result<(), FutrumError> {
let mut collection_map = self.collections
.entry(collection.to_string())
.or_insert_with(DocumentCollection::new);
if collection_map.remove(key).is_some() {
// Write to WAL
let entry = super::wal::WALEntry::Delete {
collection: collection.to_string(),
key: key.to_string(),
timestamp: chrono::Utc::now().timestamp_millis() as u64,
};
self.wal.write().await.write(&entry)
.map_err(|e| FutrumError::StorageError(e.to_string()))?;
Ok(())
} else {
Err(FutrumError::KeyNotFound(key.to_string()))
}
}
pub async fn list_collections(&self) -> Vec<String> {
self.collections.iter().map(|entry| entry.key().clone()).collect()
}
pub async fn collection_exists(&self, collection: &str) -> bool {
self.collections.contains_key(collection)
}
}

7
src/storage/mod.rs Normal file
View File

@ -0,0 +1,7 @@
pub mod document;
pub mod wal;
pub mod engine;
pub use document::Document;
pub use wal::WriteAheadLog;
pub use engine::StorageEngine;

102
src/storage/wal.rs Normal file
View File

@ -0,0 +1,102 @@
use serde::{Serialize, Deserialize};
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write, BufReader, Seek, SeekFrom, BufRead}; // Добавлен BufRead
use std::path::Path;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum WALError {
#[error("IO error: {0}")]
IOError(#[from] std::io::Error),
#[error("Serialization error: {0}")]
SerializationError(#[from] rmp_serde::encode::Error),
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum WALEntry {
Create {
collection: String,
key: String,
value: serde_json::Value,
timestamp: u64,
},
Update {
collection: String,
key: String,
value: serde_json::Value,
timestamp: u64,
},
Delete {
collection: String,
key: String,
timestamp: u64,
},
Commit {
transaction_id: String,
timestamp: u64,
},
Rollback {
transaction_id: String,
timestamp: u64,
},
}
impl WALEntry {
pub fn timestamp(&self) -> u64 {
match self {
WALEntry::Create { timestamp, .. } => *timestamp,
WALEntry::Update { timestamp, .. } => *timestamp,
WALEntry::Delete { timestamp, .. } => *timestamp,
WALEntry::Commit { timestamp, .. } => *timestamp,
WALEntry::Rollback { timestamp, .. } => *timestamp,
}
}
}
pub struct WriteAheadLog {
file: BufWriter<File>,
path: String,
}
impl WriteAheadLog {
pub fn new(path: &str) -> Result<Self, WALError> {
let file = OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
Ok(Self {
file: BufWriter::new(file),
path: path.to_string(),
})
}
pub fn write(&mut self, entry: &WALEntry) -> Result<(), WALError> {
let serialized = rmp_serde::to_vec(entry)?;
self.file.write_all(&serialized)?;
self.file.write_all(b"\n")?;
self.file.flush()?;
Ok(())
}
pub fn replay(&self) -> Result<Vec<WALEntry>, WALError> {
let file = File::open(&self.path)?;
let reader = BufReader::new(file);
let mut entries = Vec::new();
for line in reader.lines() {
let line = line?;
if let Ok(entry) = rmp_serde::from_slice(line.as_bytes()) {
entries.push(entry);
}
}
Ok(entries)
}
pub fn clear(&mut self) -> Result<(), WALError> {
self.file.seek(SeekFrom::Start(0))?;
self.file.get_mut().set_len(0)?;
Ok(())
}
}

115
src/transaction/manager.rs Normal file
View File

@ -0,0 +1,115 @@
use crate::storage::StorageEngine;
use crate::FutrumError;
use std::collections::HashMap;
use uuid::Uuid;
pub struct Transaction {
pub id: String,
pub operations: Vec<TransactionOperation>,
pub state: TransactionState,
}
pub enum TransactionOperation {
Create {
collection: String,
key: String,
value: serde_json::Value,
},
Update {
collection: String,
key: String,
value: serde_json::Value,
},
Delete {
collection: String,
key: String,
},
}
#[derive(PartialEq)]
pub enum TransactionState {
Active,
Committed,
RolledBack,
}
pub struct TransactionManager {
storage: StorageEngine,
active_transactions: HashMap<String, Transaction>,
}
impl TransactionManager {
pub fn new(storage: StorageEngine) -> Self {
Self {
storage,
active_transactions: HashMap::new(),
}
}
pub fn begin(&mut self) -> String {
let transaction_id = Uuid::new_v4().to_string();
let transaction = Transaction {
id: transaction_id.clone(),
operations: Vec::new(),
state: TransactionState::Active,
};
self.active_transactions.insert(transaction_id.clone(), transaction);
transaction_id
}
pub fn add_operation(&mut self, transaction_id: &str, operation: TransactionOperation) -> Result<(), FutrumError> {
if let Some(transaction) = self.active_transactions.get_mut(transaction_id) {
if transaction.state != TransactionState::Active {
return Err(FutrumError::TransactionError("Transaction is not active".to_string()));
}
transaction.operations.push(operation);
Ok(())
} else {
Err(FutrumError::TransactionError("Transaction not found".to_string()))
}
}
pub async fn commit(&mut self, transaction_id: &str) -> Result<(), FutrumError> {
if let Some(transaction) = self.active_transactions.get_mut(transaction_id) {
if transaction.state != TransactionState::Active {
return Err(FutrumError::TransactionError("Transaction is not active".to_string()));
}
// Apply all operations
for operation in &transaction.operations {
match operation {
TransactionOperation::Create { collection, key, value } => {
self.storage.create(collection, key, value.clone()).await?;
}
TransactionOperation::Update { collection, key, value } => {
self.storage.update(collection, key, value.clone()).await?;
}
TransactionOperation::Delete { collection, key } => {
self.storage.delete(collection, key).await?;
}
}
}
transaction.state = TransactionState::Committed;
self.active_transactions.remove(transaction_id);
Ok(())
} else {
Err(FutrumError::TransactionError("Transaction not found".to_string()))
}
}
pub fn rollback(&mut self, transaction_id: &str) -> Result<(), FutrumError> {
if let Some(transaction) = self.active_transactions.get_mut(transaction_id) {
if transaction.state != TransactionState::Active {
return Err(FutrumError::TransactionError("Transaction is not active".to_string()));
}
transaction.state = TransactionState::RolledBack;
self.active_transactions.remove(transaction_id);
Ok(())
} else {
Err(FutrumError::TransactionError("Transaction not found".to_string()))
}
}
}

3
src/transaction/mod.rs Normal file
View File

@ -0,0 +1,3 @@
pub mod manager;
pub use manager::TransactionManager;

55
tests/integration_test.rs Normal file
View File

@ -0,0 +1,55 @@
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_storage_operations() {
let db = FutrumDB::new("test.config").unwrap();
// Test CRUD operations
let value = serde_json::json!({"test": "value"});
// Create
assert!(db.create_document("test", "key1", value.clone()).await.is_ok());
// Read
let result = db.read_document("test", "key1").await.unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap(), value);
// Update
let new_value = serde_json::json!({"test": "updated"});
assert!(db.update_document("test", "key1", new_value.clone()).await.is_ok());
// Delete
assert!(db.delete_document("test", "key1").await.is_ok());
assert!(db.read_document("test", "key1").await.unwrap().is_none());
}
#[tokio::test]
async fn test_transactions() {
let db = FutrumDB::new("test.config").unwrap();
// This would test transaction functionality
// Implementation depends on TransactionManager integration
}
#[test]
fn test_command_history() {
let mut history = CommandHistory::new();
history.add_entry(
"CREATE".to_string(),
serde_json::json!({"key": "test"}),
true
);
assert_eq!(history.len(), 1);
assert!(!history.is_empty());
let recent = history.get_recent(5);
assert_eq!(recent.len(), 1);
assert_eq!(recent[0].command, "CREATE");
}
}