this is the victory
This commit is contained in:
commit
030828df31
2372
Cargo.lock
generated
Normal file
2372
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
48
Cargo.toml
Normal file
48
Cargo.toml
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
[package]
|
||||||
|
name = "futriix"
|
||||||
|
version = "0.3.0"
|
||||||
|
edition = "2024"
|
||||||
|
authors = ["Futriix Developer"]
|
||||||
|
description = "Application server with embedded Lua interpreter and Futrum database"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
rmp-serde = "1.1"
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_json = "1.0"
|
||||||
|
tokio = { version = "1.0", features = ["full"] }
|
||||||
|
ansi_term = "0.12"
|
||||||
|
hashbrown = "0.13"
|
||||||
|
arc-swap = "1.0"
|
||||||
|
dashmap = "5.0"
|
||||||
|
thiserror = "1.0"
|
||||||
|
log = "0.4"
|
||||||
|
env_logger = "0.10"
|
||||||
|
bytes = "1.0"
|
||||||
|
uuid = { version = "1.0", features = ["v4"] }
|
||||||
|
chrono = { version = "0.4", features = ["serde"] }
|
||||||
|
async-trait = "0.1"
|
||||||
|
rand = "0.8"
|
||||||
|
toml = "0.8"
|
||||||
|
config = "0.13"
|
||||||
|
simplelog = "0.12"
|
||||||
|
time = "0.3"
|
||||||
|
rlua = "0.20.1"
|
||||||
|
hyper = { version = "1.0", features = ["full"] }
|
||||||
|
hyper-tls = "0.6"
|
||||||
|
tokio-rustls = "0.25"
|
||||||
|
rustls = "0.23.32"
|
||||||
|
rustls-pemfile = "2.0"
|
||||||
|
http-body-util = "0.1"
|
||||||
|
tower = "0.4"
|
||||||
|
tower-service = "0.3"
|
||||||
|
mime = "0.3"
|
||||||
|
walkdir = "2.5"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tempfile = "3.0"
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = []
|
||||||
|
multi-master = []
|
||||||
|
http2 = []
|
||||||
|
https = []
|
42
config.toml
Normal file
42
config.toml
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
# Futriix Application Server Configuration
|
||||||
|
|
||||||
|
[server]
|
||||||
|
port = 8080
|
||||||
|
host = "127.0.0.1"
|
||||||
|
enable_http2 = true
|
||||||
|
enable_https = false
|
||||||
|
ssl_cert_path = "./certs/cert.pem"
|
||||||
|
ssl_key_path = "./certs/key.pem"
|
||||||
|
static_files_dir = "./static"
|
||||||
|
lua_scripts_dir = "./lua-scripts"
|
||||||
|
|
||||||
|
[application]
|
||||||
|
name = "futriix"
|
||||||
|
version = "0.3.0"
|
||||||
|
max_file_size_mb = 10
|
||||||
|
supported_formats = ["pdf", "txt", "html", "css", "md", "mp3", "mp4", "js", "lua"]
|
||||||
|
|
||||||
|
[database]
|
||||||
|
enabled = true
|
||||||
|
port = 8080
|
||||||
|
host = "127.0.0.1"
|
||||||
|
|
||||||
|
[raft]
|
||||||
|
enabled = true
|
||||||
|
election_timeout_ms = 150
|
||||||
|
heartbeat_interval_ms = 50
|
||||||
|
cluster_nodes = ["127.0.0.1:8080", "127.0.0.1:8081", "127.0.0.1:8082"]
|
||||||
|
|
||||||
|
[replication]
|
||||||
|
multi_master_enabled = false
|
||||||
|
sync_interval_ms = 100
|
||||||
|
conflict_resolution = "last-write-wins"
|
||||||
|
|
||||||
|
[acl]
|
||||||
|
enabled = true
|
||||||
|
admin_users = ["admin"]
|
||||||
|
readonly_users = ["guest"]
|
||||||
|
|
||||||
|
[logging]
|
||||||
|
level = "info"
|
||||||
|
file = "futriix.log"
|
13
futriix.log
Normal file
13
futriix.log
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
[INFO] Starting futriiX application-server
|
||||||
|
[INFO] Initializing application server components
|
||||||
|
[INFO] Creating Lua interpreter
|
||||||
|
[INFO] Starting HTTP server
|
||||||
|
[INFO] HTTP server background task started
|
||||||
|
[INFO] HTTP server started on static directory: ./static
|
||||||
|
[INFO] Starting interactive mode
|
||||||
|
[INFO] Interactive mode started, waiting for user input
|
||||||
|
[INFO] Lua command received: exit
|
||||||
|
[INFO] Exit command received, shutting down
|
||||||
|
[INFO] Interactive mode ended
|
||||||
|
[INFO] Stopping HTTP server
|
||||||
|
[INFO] futriiX application-server stopped
|
0
futrum.wal
Normal file
0
futrum.wal
Normal file
90
src/acl.rs
Normal file
90
src/acl.rs
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
// src/acl.rs
|
||||||
|
use crate::FutrumError;
|
||||||
|
use dashmap::DashMap;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
|
pub enum Permission {
|
||||||
|
Read,
|
||||||
|
Write,
|
||||||
|
Admin,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct User {
|
||||||
|
pub username: String,
|
||||||
|
pub permissions: Vec<Permission>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ACLManager {
|
||||||
|
users: Arc<DashMap<String, User>>,
|
||||||
|
enabled: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ACLManager {
|
||||||
|
pub fn new(enabled: bool) -> Self {
|
||||||
|
let users = Arc::new(DashMap::new());
|
||||||
|
|
||||||
|
// Добавляем стандартных пользователей
|
||||||
|
if enabled {
|
||||||
|
users.insert("admin".to_string(), User {
|
||||||
|
username: "admin".to_string(),
|
||||||
|
permissions: vec![Permission::Read, Permission::Write, Permission::Admin],
|
||||||
|
});
|
||||||
|
|
||||||
|
users.insert("guest".to_string(), User {
|
||||||
|
username: "guest".to_string(),
|
||||||
|
permissions: vec![Permission::Read],
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Self { users, enabled }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn check_permission(&self, username: &str, permission: Permission) -> bool {
|
||||||
|
if !self.enabled {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
match self.users.get(username) {
|
||||||
|
Some(user) => user.permissions.contains(&permission),
|
||||||
|
None => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_user(&self, username: String, permissions: Vec<Permission>) -> Result<(), FutrumError> {
|
||||||
|
if !self.enabled {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
self.users.insert(username.clone(), User { username, permissions });
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_user(&self, username: &str) -> Result<(), FutrumError> {
|
||||||
|
if !self.enabled {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
self.users.remove(username);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn list_users(&self) -> Vec<User> {
|
||||||
|
self.users.iter().map(|entry| entry.value().clone()).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_user_permissions(&self, username: &str, permissions: Vec<Permission>) -> Result<(), FutrumError> {
|
||||||
|
if !self.enabled {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(mut user) = self.users.get_mut(username) {
|
||||||
|
user.permissions = permissions;
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(FutrumError::ConfigError(format!("User {} not found", username)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
108
src/application.rs
Normal file
108
src/application.rs
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
// src/application.rs
|
||||||
|
use crate::{FutrumError, Result};
|
||||||
|
use hyper::Response;
|
||||||
|
use std::fs;
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ApplicationServer {
|
||||||
|
static_files_dir: String,
|
||||||
|
supported_formats: Vec<String>,
|
||||||
|
max_file_size: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ApplicationServer {
|
||||||
|
pub fn new(static_files_dir: String, supported_formats: Vec<String>, max_file_size_mb: u64) -> Self {
|
||||||
|
Self {
|
||||||
|
static_files_dir,
|
||||||
|
supported_formats,
|
||||||
|
max_file_size: max_file_size_mb * 1024 * 1024,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn serve_file(&self, path: &str) -> Result<Response<String>> {
|
||||||
|
log::debug!("Serving file: {}", path);
|
||||||
|
|
||||||
|
let file_path = Path::new(&self.static_files_dir).join(path.trim_start_matches('/'));
|
||||||
|
|
||||||
|
// Проверяем безопасность пути
|
||||||
|
if !file_path.starts_with(&self.static_files_dir) {
|
||||||
|
log::warn!("Attempted to access forbidden path: {}", path);
|
||||||
|
return Response::builder()
|
||||||
|
.status(403)
|
||||||
|
.body("403 Forbidden".to_string())
|
||||||
|
.map_err(|e| FutrumError::HttpError(e.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !file_path.exists() {
|
||||||
|
log::warn!("File not found: {}", path);
|
||||||
|
return Response::builder()
|
||||||
|
.status(404)
|
||||||
|
.body("404 Not Found".to_string())
|
||||||
|
.map_err(|e| FutrumError::HttpError(e.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let metadata = fs::metadata(&file_path)
|
||||||
|
.map_err(|e| {
|
||||||
|
log::error!("Failed to get file metadata for {}: {}", path, e);
|
||||||
|
FutrumError::HttpError(e.to_string())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if metadata.len() > self.max_file_size {
|
||||||
|
log::warn!("File too large: {} ({} bytes)", path, metadata.len());
|
||||||
|
return Response::builder()
|
||||||
|
.status(413)
|
||||||
|
.body("413 Payload Too Large".to_string())
|
||||||
|
.map_err(|e| FutrumError::HttpError(e.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let extension = file_path.extension()
|
||||||
|
.and_then(|ext| ext.to_str())
|
||||||
|
.unwrap_or("");
|
||||||
|
|
||||||
|
if !self.supported_formats.contains(&extension.to_string()) {
|
||||||
|
log::warn!("Unsupported file format: {}", extension);
|
||||||
|
return Response::builder()
|
||||||
|
.status(415)
|
||||||
|
.body("415 Unsupported Media Type".to_string())
|
||||||
|
.map_err(|e| FutrumError::HttpError(e.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let content = fs::read_to_string(&file_path)
|
||||||
|
.map_err(|e| {
|
||||||
|
log::error!("Failed to read file {}: {}", path, e);
|
||||||
|
FutrumError::HttpError(e.to_string())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let mime_type = self.get_mime_type(extension);
|
||||||
|
|
||||||
|
log::debug!("Successfully served file: {} ({} bytes)", path, content.len());
|
||||||
|
|
||||||
|
Response::builder()
|
||||||
|
.header("Content-Type", mime_type)
|
||||||
|
.body(content)
|
||||||
|
.map_err(|e| FutrumError::HttpError(e.to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_mime_type(&self, extension: &str) -> &str {
|
||||||
|
match extension {
|
||||||
|
"html" => "text/html",
|
||||||
|
"css" => "text/css",
|
||||||
|
"js" => "application/javascript",
|
||||||
|
"lua" => "text/plain",
|
||||||
|
"txt" => "text/plain",
|
||||||
|
"md" => "text/markdown",
|
||||||
|
"pdf" => "application/pdf",
|
||||||
|
"mp3" => "audio/mpeg",
|
||||||
|
"mp4" => "video/mp4",
|
||||||
|
_ => "application/octet-stream",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start_http_server(&self) -> Result<()> {
|
||||||
|
// Выводим сообщение о запуске HTTP сервера и логируем
|
||||||
|
log::info!("HTTP server started on static directory: {}", self.static_files_dir);
|
||||||
|
println!("HTTP server started on static directory: {}", self.static_files_dir);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
70
src/command_history.rs
Normal file
70
src/command_history.rs
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
// src/command_history.rs
|
||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
pub struct CommandEntry {
|
||||||
|
pub timestamp: DateTime<Utc>,
|
||||||
|
pub command: String,
|
||||||
|
pub parameters: serde_json::Value,
|
||||||
|
pub success: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct CommandHistory {
|
||||||
|
entries: VecDeque<CommandEntry>,
|
||||||
|
max_size: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CommandHistory {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
entries: VecDeque::new(),
|
||||||
|
max_size: 1000, // Keep last 1000 commands
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_entry(&mut self, command: String, parameters: serde_json::Value, success: bool) {
|
||||||
|
let entry = CommandEntry {
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
command,
|
||||||
|
parameters,
|
||||||
|
success,
|
||||||
|
};
|
||||||
|
|
||||||
|
self.entries.push_back(entry);
|
||||||
|
|
||||||
|
// Remove oldest entries if we exceed max size
|
||||||
|
while self.entries.len() > self.max_size {
|
||||||
|
self.entries.pop_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_recent(&self, count: usize) -> Vec<&CommandEntry> {
|
||||||
|
let start = if self.entries.len() > count {
|
||||||
|
self.entries.len() - count
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
|
||||||
|
self.entries.range(start..).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn clear(&mut self) {
|
||||||
|
self.entries.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.entries.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.entries.is_empty()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for CommandHistory {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
39
src/consensus/messages.rs
Normal file
39
src/consensus/messages.rs
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
pub struct VoteRequest {
|
||||||
|
pub term: u64,
|
||||||
|
pub candidate_id: String,
|
||||||
|
pub last_log_index: u64,
|
||||||
|
pub last_log_term: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
pub struct VoteResponse {
|
||||||
|
pub term: u64,
|
||||||
|
pub vote_granted: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
pub struct AppendEntriesRequest {
|
||||||
|
pub term: u64,
|
||||||
|
pub leader_id: String,
|
||||||
|
pub prev_log_index: u64,
|
||||||
|
pub prev_log_term: u64,
|
||||||
|
pub entries: Vec<Vec<u8>>,
|
||||||
|
pub leader_commit: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
pub struct AppendEntriesResponse {
|
||||||
|
pub term: u64,
|
||||||
|
pub success: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
pub enum RaftMessage {
|
||||||
|
VoteRequest(VoteRequest),
|
||||||
|
VoteResponse(VoteResponse),
|
||||||
|
AppendEntriesRequest(AppendEntriesRequest),
|
||||||
|
AppendEntriesResponse(AppendEntriesResponse),
|
||||||
|
}
|
7
src/consensus/mod.rs
Normal file
7
src/consensus/mod.rs
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
pub mod raft;
|
||||||
|
pub mod state;
|
||||||
|
pub mod messages;
|
||||||
|
|
||||||
|
pub use raft::RaftConsensus;
|
||||||
|
pub use state::{RaftState, NodeState};
|
||||||
|
pub use messages::{RaftMessage, AppendEntriesRequest, AppendEntriesResponse};
|
132
src/consensus/raft.rs
Normal file
132
src/consensus/raft.rs
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
// src/consensus/raft.rs
|
||||||
|
use super::state::{RaftState, NodeState};
|
||||||
|
use crate::FutrumError;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::RwLock;
|
||||||
|
use tokio::time::Duration;
|
||||||
|
use uuid::Uuid;
|
||||||
|
use rand;
|
||||||
|
|
||||||
|
pub struct RaftConsensus {
|
||||||
|
state: Arc<RwLock<RaftState>>,
|
||||||
|
node_id: String,
|
||||||
|
peers: Vec<String>,
|
||||||
|
election_timeout: Duration,
|
||||||
|
heartbeat_interval: Duration,
|
||||||
|
enabled: bool,
|
||||||
|
should_stop: Arc<RwLock<bool>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RaftConsensus {
|
||||||
|
pub fn new(enabled: bool, election_timeout_ms: u64, heartbeat_interval_ms: u64) -> Result<Self, FutrumError> {
|
||||||
|
let node_id = Uuid::new_v4().to_string();
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
state: Arc::new(RwLock::new(RaftState::new(node_id.clone()))),
|
||||||
|
node_id,
|
||||||
|
peers: Vec::new(),
|
||||||
|
election_timeout: Duration::from_millis(election_timeout_ms),
|
||||||
|
heartbeat_interval: Duration::from_millis(heartbeat_interval_ms),
|
||||||
|
enabled,
|
||||||
|
should_stop: Arc::new(RwLock::new(false)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Тихая версия запуска - БЕЗ ВЫВОДА В КОНСОЛЬ
|
||||||
|
pub async fn start_silent(&self) -> Result<(), FutrumError> {
|
||||||
|
if !self.enabled {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let state = self.state.clone();
|
||||||
|
let node_id = self.node_id.clone();
|
||||||
|
let should_stop = self.should_stop.clone();
|
||||||
|
let election_timeout = self.election_timeout;
|
||||||
|
let heartbeat_interval = self.heartbeat_interval;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut iteration_count = 0;
|
||||||
|
|
||||||
|
while !*should_stop.read().await {
|
||||||
|
iteration_count += 1;
|
||||||
|
|
||||||
|
// ОГРАНИЧИВАЕМ активность - работаем только каждые 100 итераций
|
||||||
|
if iteration_count % 100 != 0 {
|
||||||
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let current_state = { state.read().await.current_state };
|
||||||
|
|
||||||
|
match current_state {
|
||||||
|
NodeState::Follower => {
|
||||||
|
if Self::should_start_election_silent(&should_stop).await && !*should_stop.read().await {
|
||||||
|
state.write().await.current_state = NodeState::Candidate;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
NodeState::Candidate => {
|
||||||
|
if !*should_stop.read().await {
|
||||||
|
let _ = Self::start_election_silent(state.clone()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
NodeState::Leader => {
|
||||||
|
// Минимальная активность в режиме лидера
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn should_start_election_silent(should_stop: &Arc<RwLock<bool>>) -> bool {
|
||||||
|
// Увеличиваем таймаут для уменьшения активности
|
||||||
|
let timeout = rand::random::<u64>() % 1000 + 500; // 500-1500ms вместо 150-450ms
|
||||||
|
tokio::time::sleep(Duration::from_millis(timeout.min(100))).await;
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn start_election_silent(state: Arc<RwLock<RaftState>>) -> Result<(), FutrumError> {
|
||||||
|
let mut state_guard = state.write().await;
|
||||||
|
state_guard.current_term += 1;
|
||||||
|
state_guard.voted_for = Some(state_guard.node_id.clone());
|
||||||
|
state_guard.vote_count = 1;
|
||||||
|
|
||||||
|
if state_guard.vote_count > 0 {
|
||||||
|
state_guard.current_state = NodeState::Leader;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn is_leader(&self) -> bool {
|
||||||
|
if !self.enabled {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
self.state.read().await.current_state == NodeState::Leader
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_current_term(&self) -> u64 {
|
||||||
|
self.state.read().await.current_term
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn stop(&self) {
|
||||||
|
*self.should_stop.write().await = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for RaftConsensus {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
state: self.state.clone(),
|
||||||
|
node_id: self.node_id.clone(),
|
||||||
|
peers: self.peers.clone(),
|
||||||
|
election_timeout: self.election_timeout,
|
||||||
|
heartbeat_interval: self.heartbeat_interval,
|
||||||
|
enabled: self.enabled,
|
||||||
|
should_stop: self.should_stop.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
41
src/consensus/state.rs
Normal file
41
src/consensus/state.rs
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
|
||||||
|
pub enum NodeState {
|
||||||
|
Follower,
|
||||||
|
Candidate,
|
||||||
|
Leader,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
pub struct LogEntry {
|
||||||
|
pub term: u64,
|
||||||
|
pub index: u64,
|
||||||
|
pub command: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct RaftState {
|
||||||
|
pub node_id: String,
|
||||||
|
pub current_term: u64,
|
||||||
|
pub voted_for: Option<String>,
|
||||||
|
pub vote_count: u64,
|
||||||
|
pub current_state: NodeState,
|
||||||
|
pub log: Vec<LogEntry>,
|
||||||
|
pub commit_index: u64,
|
||||||
|
pub last_applied: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RaftState {
|
||||||
|
pub fn new(node_id: String) -> Self {
|
||||||
|
Self {
|
||||||
|
node_id,
|
||||||
|
current_term: 0,
|
||||||
|
voted_for: None,
|
||||||
|
vote_count: 0,
|
||||||
|
current_state: NodeState::Follower,
|
||||||
|
log: Vec::new(),
|
||||||
|
commit_index: 0,
|
||||||
|
last_applied: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
531
src/lib.rs
Normal file
531
src/lib.rs
Normal file
@ -0,0 +1,531 @@
|
|||||||
|
// src/lib.rs
|
||||||
|
pub mod storage;
|
||||||
|
pub mod consensus;
|
||||||
|
pub mod network;
|
||||||
|
pub mod transaction;
|
||||||
|
pub mod command_history;
|
||||||
|
pub mod replication;
|
||||||
|
pub mod acl;
|
||||||
|
pub mod application;
|
||||||
|
pub mod lua_interpreter;
|
||||||
|
|
||||||
|
use ansi_term::Colour::White;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::{RwLock, mpsc, Mutex};
|
||||||
|
use thiserror::Error;
|
||||||
|
use config::Config;
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
use storage::StorageEngine;
|
||||||
|
use consensus::RaftConsensus;
|
||||||
|
use network::Server;
|
||||||
|
use acl::ACLManager;
|
||||||
|
|
||||||
|
#[derive(Error, Debug)]
|
||||||
|
pub enum FutrumError {
|
||||||
|
#[error("Key not found: {0}")]
|
||||||
|
KeyNotFound(String),
|
||||||
|
#[error("Storage error: {0}")]
|
||||||
|
StorageError(String),
|
||||||
|
#[error("Consensus error: {0}")]
|
||||||
|
ConsensusError(String),
|
||||||
|
#[error("Network error: {0}")]
|
||||||
|
NetworkError(String),
|
||||||
|
#[error("Transaction error: {0}")]
|
||||||
|
TransactionError(String),
|
||||||
|
#[error("Serialization error: {0}")]
|
||||||
|
SerializationError(String),
|
||||||
|
#[error("Configuration error: {0}")]
|
||||||
|
ConfigError(String),
|
||||||
|
#[error("Permission denied: {0}")]
|
||||||
|
PermissionDenied(String),
|
||||||
|
#[error("Lua error: {0}")]
|
||||||
|
LuaError(String),
|
||||||
|
#[error("HTTP error: {0}")]
|
||||||
|
HttpError(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type Result<T> = std::result::Result<T, FutrumError>;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum Command {
|
||||||
|
Shutdown,
|
||||||
|
Status,
|
||||||
|
SwitchToDatabase,
|
||||||
|
SwitchToLua,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct FutrumConfig {
|
||||||
|
pub server_port: u16,
|
||||||
|
pub server_host: String,
|
||||||
|
pub raft_enabled: bool,
|
||||||
|
pub election_timeout_ms: u64,
|
||||||
|
pub heartbeat_interval_ms: u64,
|
||||||
|
pub acl_enabled: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for FutrumConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
server_port: 8080,
|
||||||
|
server_host: "127.0.0.1".to_string(),
|
||||||
|
raft_enabled: true,
|
||||||
|
election_timeout_ms: 150,
|
||||||
|
heartbeat_interval_ms: 50,
|
||||||
|
acl_enabled: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FutrumConfig {
|
||||||
|
pub fn from_file(path: &str) -> Result<Self> {
|
||||||
|
let settings = Config::builder()
|
||||||
|
.add_source(config::File::with_name(path))
|
||||||
|
.build()
|
||||||
|
.map_err(|e| FutrumError::ConfigError(e.to_string()))?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
server_port: settings.get::<u16>("server.port")
|
||||||
|
.map_err(|e| FutrumError::ConfigError(e.to_string()))?,
|
||||||
|
server_host: settings.get::<String>("server.host")
|
||||||
|
.map_err(|e| FutrumError::ConfigError(e.to_string()))?,
|
||||||
|
raft_enabled: settings.get::<bool>("raft.enabled")
|
||||||
|
.map_err(|e| FutrumError::ConfigError(e.to_string()))?,
|
||||||
|
election_timeout_ms: settings.get::<u64>("raft.election_timeout_ms")
|
||||||
|
.map_err(|e| FutrumError::ConfigError(e.to_string()))?,
|
||||||
|
heartbeat_interval_ms: settings.get::<u64>("raft.heartbeat_interval_ms")
|
||||||
|
.map_err(|e| FutrumError::ConfigError(e.to_string()))?,
|
||||||
|
acl_enabled: settings.get::<bool>("acl.enabled")
|
||||||
|
.unwrap_or(true),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct FutrumDB {
|
||||||
|
storage: Arc<StorageEngine>,
|
||||||
|
consensus: Arc<RaftConsensus>,
|
||||||
|
server: Arc<Server>,
|
||||||
|
config: FutrumConfig,
|
||||||
|
is_running: Arc<Mutex<bool>>,
|
||||||
|
acl_manager: Arc<ACLManager>,
|
||||||
|
current_user: Arc<RwLock<String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FutrumDB {
|
||||||
|
pub fn new(config_path: &str) -> Result<Self> {
|
||||||
|
let config = FutrumConfig::from_file(config_path)?;
|
||||||
|
|
||||||
|
let storage = Arc::new(StorageEngine::new()?);
|
||||||
|
// WAL уже включен по умолчанию в StorageEngine::new()
|
||||||
|
|
||||||
|
let acl_manager = Arc::new(ACLManager::new(config.acl_enabled));
|
||||||
|
|
||||||
|
let consensus = Arc::new(RaftConsensus::new(
|
||||||
|
config.raft_enabled,
|
||||||
|
config.election_timeout_ms,
|
||||||
|
config.heartbeat_interval_ms,
|
||||||
|
)?);
|
||||||
|
|
||||||
|
let command_history = Arc::new(RwLock::new(command_history::CommandHistory::new()));
|
||||||
|
|
||||||
|
let server = Server::new(
|
||||||
|
storage.clone(),
|
||||||
|
consensus.clone(),
|
||||||
|
command_history,
|
||||||
|
config.server_host.clone(),
|
||||||
|
config.server_port,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
log::info!("FutrumDB initialized with WAL enabled");
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
storage,
|
||||||
|
consensus,
|
||||||
|
server: Arc::new(server),
|
||||||
|
config,
|
||||||
|
is_running: Arc::new(Mutex::new(false)),
|
||||||
|
acl_manager,
|
||||||
|
current_user: Arc::new(RwLock::new("guest".to_string())),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn set_current_user(&self, username: String) -> Result<()> {
|
||||||
|
if !self.acl_manager.check_permission(&username, acl::Permission::Read) {
|
||||||
|
return Err(FutrumError::PermissionDenied(format!("User {} has no permissions", username)));
|
||||||
|
}
|
||||||
|
*self.current_user.write().await = username;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_current_user(&self) -> String {
|
||||||
|
self.current_user.read().await.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn check_permission(&self, permission: acl::Permission) -> bool {
|
||||||
|
let user = self.current_user.read().await.clone();
|
||||||
|
self.acl_manager.check_permission(&user, permission)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Создание пространства (коллекции)
|
||||||
|
pub async fn create_space(&self, space_name: &str) -> Result<()> {
|
||||||
|
if !self.check_permission(acl::Permission::Write).await {
|
||||||
|
return Err(FutrumError::PermissionDenied("Write permission required".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Создаем инициализационный документ для пространства
|
||||||
|
let init_doc = serde_json::json!({
|
||||||
|
"type": "space",
|
||||||
|
"name": space_name,
|
||||||
|
"created_at": chrono::Utc::now().to_rfc3339(),
|
||||||
|
"status": "active"
|
||||||
|
});
|
||||||
|
|
||||||
|
self.storage.create(space_name, "_space_info", init_doc).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Создание кортежа (документа) в пространстве
|
||||||
|
pub async fn create_tuple(&self, space_name: &str, tuple_id: &str, tuple_data: serde_json::Value) -> Result<()> {
|
||||||
|
if !self.check_permission(acl::Permission::Write).await {
|
||||||
|
return Err(FutrumError::PermissionDenied("Write permission required".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Проверяем существование пространства
|
||||||
|
let space_info = self.storage.read(space_name, "_space_info").await?;
|
||||||
|
if space_info.is_none() {
|
||||||
|
return Err(FutrumError::StorageError(format!("Space '{}' does not exist", space_name)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Добавляем метаданные к кортежу
|
||||||
|
let mut tuple_with_meta = tuple_data.as_object().cloned().unwrap_or_default();
|
||||||
|
tuple_with_meta.insert("_tuple_id".to_string(), serde_json::Value::String(tuple_id.to_string()));
|
||||||
|
tuple_with_meta.insert("_space".to_string(), serde_json::Value::String(space_name.to_string()));
|
||||||
|
tuple_with_meta.insert("_created_at".to_string(), serde_json::Value::String(chrono::Utc::now().to_rfc3339()));
|
||||||
|
|
||||||
|
self.storage.create(space_name, tuple_id, serde_json::Value::Object(tuple_with_meta)).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Новая архитектура: полное разделение логики
|
||||||
|
pub async fn run_silent(
|
||||||
|
self,
|
||||||
|
mut command_rx: mpsc::Receiver<Command>,
|
||||||
|
) -> Result<()> {
|
||||||
|
*self.is_running.lock().await = true;
|
||||||
|
|
||||||
|
// Запускаем компоненты в тихом режиме (без вывода в консоль)
|
||||||
|
let server_handle = self.start_server_silent().await?;
|
||||||
|
let raft_handle = self.start_raft_silent().await?;
|
||||||
|
|
||||||
|
// Основной цикл обработки команд
|
||||||
|
while let Some(command) = command_rx.recv().await {
|
||||||
|
match command {
|
||||||
|
Command::Shutdown => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Command::Status => {
|
||||||
|
// Статус обрабатывается через отдельный метод
|
||||||
|
}
|
||||||
|
Command::SwitchToDatabase => {
|
||||||
|
// Переключение в режим базы данных
|
||||||
|
}
|
||||||
|
Command::SwitchToLua => {
|
||||||
|
// Переключение в режим Lua
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Грациозная остановка
|
||||||
|
self.shutdown_silent(server_handle, raft_handle).await;
|
||||||
|
|
||||||
|
*self.is_running.lock().await = false;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn start_server_silent(&self) -> Result<tokio::task::JoinHandle<()>> {
|
||||||
|
let server = self.server.clone();
|
||||||
|
Ok(tokio::spawn(async move {
|
||||||
|
let _ = server.run_silent().await;
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn start_raft_silent(&self) -> Result<Option<tokio::task::JoinHandle<()>>> {
|
||||||
|
if !self.config.raft_enabled {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let consensus = self.consensus.clone();
|
||||||
|
Ok(Some(tokio::spawn(async move {
|
||||||
|
let _ = consensus.start_silent().await;
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn shutdown_silent(
|
||||||
|
&self,
|
||||||
|
server_handle: tokio::task::JoinHandle<()>,
|
||||||
|
raft_handle: Option<tokio::task::JoinHandle<()>>,
|
||||||
|
) {
|
||||||
|
// Останавливаем Raft
|
||||||
|
if let Some(handle) = raft_handle {
|
||||||
|
self.consensus.stop().await;
|
||||||
|
handle.abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Останавливаем сервер
|
||||||
|
self.server.stop().await;
|
||||||
|
server_handle.abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Метод для получения статуса (вызывается из интерактивного клиента)
|
||||||
|
pub async fn get_status(&self) -> Option<String> {
|
||||||
|
if !*self.is_running.lock().await {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut status = String::new();
|
||||||
|
status.push_str("Database Status:\n");
|
||||||
|
status.push_str(&format!(" Raft enabled: {}\n", self.config.raft_enabled));
|
||||||
|
status.push_str(&format!(" Server port: {}\n", self.config.server_port));
|
||||||
|
status.push_str(&format!(" Current user: {}\n", self.get_current_user().await));
|
||||||
|
status.push_str(&format!(" ACL enabled: {}\n", self.config.acl_enabled));
|
||||||
|
status.push_str(&format!(" WAL enabled: {}\n", self.storage.is_wal_enabled()));
|
||||||
|
|
||||||
|
if self.config.raft_enabled {
|
||||||
|
let is_leader = self.consensus.is_leader().await;
|
||||||
|
let term = self.consensus.get_current_term().await;
|
||||||
|
status.push_str(&format!(" Raft status: {}\n", if is_leader { "LEADER" } else { "FOLLOWER" }));
|
||||||
|
status.push_str(&format!(" Current term: {}\n", term));
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(status)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Методы для работы с документами
|
||||||
|
pub async fn create_document(&self, collection: &str, key: &str, value: serde_json::Value) -> Result<()> {
|
||||||
|
if !self.check_permission(acl::Permission::Write).await {
|
||||||
|
return Err(FutrumError::PermissionDenied("Write permission required".to_string()));
|
||||||
|
}
|
||||||
|
self.storage.create(collection, key, value).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn read_document(&self, collection: &str, key: &str) -> Result<Option<serde_json::Value>> {
|
||||||
|
if !self.check_permission(acl::Permission::Read).await {
|
||||||
|
return Err(FutrumError::PermissionDenied("Read permission required".to_string()));
|
||||||
|
}
|
||||||
|
self.storage.read(collection, key).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_document(&self, collection: &str, key: &str, value: serde_json::Value) -> Result<()> {
|
||||||
|
if !self.check_permission(acl::Permission::Write).await {
|
||||||
|
return Err(FutrumError::PermissionDenied("Write permission required".to_string()));
|
||||||
|
}
|
||||||
|
self.storage.update(collection, key, value).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn delete_document(&self, collection: &str, key: &str) -> Result<()> {
|
||||||
|
if !self.check_permission(acl::Permission::Write).await {
|
||||||
|
return Err(FutrumError::PermissionDenied("Write permission required".to_string()));
|
||||||
|
}
|
||||||
|
self.storage.delete(collection, key).await
|
||||||
|
}
|
||||||
|
|
||||||
|
// ACL команды
|
||||||
|
pub async fn add_user(&self, username: String, permissions: Vec<acl::Permission>) -> Result<String> {
|
||||||
|
if !self.check_permission(acl::Permission::Admin).await {
|
||||||
|
return Err(FutrumError::PermissionDenied("Admin permission required".to_string()));
|
||||||
|
}
|
||||||
|
self.acl_manager.add_user(username.clone(), permissions)?;
|
||||||
|
Ok(format!("User {} added successfully", username))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn remove_user(&self, username: &str) -> Result<String> {
|
||||||
|
if !self.check_permission(acl::Permission::Admin).await {
|
||||||
|
return Err(FutrumError::PermissionDenied("Admin permission required".to_string()));
|
||||||
|
}
|
||||||
|
self.acl_manager.remove_user(username)?;
|
||||||
|
Ok(format!("User {} removed successfully", username))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list_users(&self) -> Result<String> {
|
||||||
|
if !self.check_permission(acl::Permission::Admin).await {
|
||||||
|
return Err(FutrumError::PermissionDenied("Admin permission required".to_string()));
|
||||||
|
}
|
||||||
|
let users = self.acl_manager.list_users();
|
||||||
|
let mut result = String::from("Users:\n");
|
||||||
|
for user in users {
|
||||||
|
result.push_str(&format!(" {}: {:?}\n", user.username, user.permissions));
|
||||||
|
}
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Новый метод для обработки CLI команд
|
||||||
|
pub async fn execute_command(&self, command: CliCommand) -> Result<String> {
|
||||||
|
match command {
|
||||||
|
// CRUD команды
|
||||||
|
CliCommand::Create { collection, key, value } => {
|
||||||
|
let value_json: serde_json::Value = serde_json::from_str(&value)
|
||||||
|
.map_err(|e| FutrumError::SerializationError(e.to_string()))?;
|
||||||
|
self.create_document(&collection, &key, value_json).await?;
|
||||||
|
Ok(format!("Document {}/{} created successfully", collection, key))
|
||||||
|
}
|
||||||
|
|
||||||
|
CliCommand::CreateTuple { space, tuple_id, value } => {
|
||||||
|
let value_json: serde_json::Value = serde_json::from_str(&value)
|
||||||
|
.map_err(|e| FutrumError::SerializationError(e.to_string()))?;
|
||||||
|
self.create_tuple(&space, &tuple_id, value_json).await?;
|
||||||
|
Ok(format!("Tuple {} created in space {}", tuple_id, space))
|
||||||
|
}
|
||||||
|
|
||||||
|
CliCommand::CreateSpace { space_name } => {
|
||||||
|
self.create_space(&space_name).await?;
|
||||||
|
Ok(format!("Space '{}' created successfully", space_name))
|
||||||
|
}
|
||||||
|
|
||||||
|
CliCommand::Read { collection, key } => {
|
||||||
|
match self.read_document(&collection, &key).await? {
|
||||||
|
Some(value) => Ok(format!("{}", value)),
|
||||||
|
None => Ok("Document not found".to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
CliCommand::Update { collection, key, value } => {
|
||||||
|
let value_json: serde_json::Value = serde_json::from_str(&value)
|
||||||
|
.map_err(|e| FutrumError::SerializationError(e.to_string()))?;
|
||||||
|
self.update_document(&collection, &key, value_json).await?;
|
||||||
|
Ok(format!("Document {}/{} updated successfully", collection, key))
|
||||||
|
}
|
||||||
|
|
||||||
|
CliCommand::Delete { collection, key } => {
|
||||||
|
self.delete_document(&collection, &key).await?;
|
||||||
|
Ok(format!("Document {}/{} deleted successfully", collection, key))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Команды кластера
|
||||||
|
CliCommand::ClusterStatus => {
|
||||||
|
if let Some(status) = self.get_status().await {
|
||||||
|
Ok(status)
|
||||||
|
} else {
|
||||||
|
Ok("Database not running".to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ACL команды
|
||||||
|
CliCommand::AddUser { username, permissions } => {
|
||||||
|
let perms: Vec<acl::Permission> = permissions
|
||||||
|
.split(',')
|
||||||
|
.map(|p| match p.trim() {
|
||||||
|
"read" => acl::Permission::Read,
|
||||||
|
"write" => acl::Permission::Write,
|
||||||
|
"admin" => acl::Permission::Admin,
|
||||||
|
_ => acl::Permission::Read,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
self.add_user(username, perms).await
|
||||||
|
}
|
||||||
|
|
||||||
|
CliCommand::RemoveUser { username } => {
|
||||||
|
self.remove_user(&username).await
|
||||||
|
}
|
||||||
|
|
||||||
|
CliCommand::ListUsers => {
|
||||||
|
self.list_users().await
|
||||||
|
}
|
||||||
|
|
||||||
|
CliCommand::Login { username } => {
|
||||||
|
self.set_current_user(username.clone()).await?;
|
||||||
|
Ok(format!("Logged in as {}", username))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Остальные команды пока заглушки
|
||||||
|
_ => Ok("Command not yet implemented".to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for FutrumDB {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
storage: self.storage.clone(),
|
||||||
|
consensus: self.consensus.clone(),
|
||||||
|
server: self.server.clone(),
|
||||||
|
config: self.config.clone(),
|
||||||
|
is_running: self.is_running.clone(),
|
||||||
|
acl_manager: self.acl_manager.clone(),
|
||||||
|
current_user: self.current_user.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CLI структуры и команды
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum CliCommand {
|
||||||
|
/// Start database server
|
||||||
|
Start {
|
||||||
|
daemon: bool,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// CRUD operations
|
||||||
|
Create {
|
||||||
|
collection: String,
|
||||||
|
key: String,
|
||||||
|
value: String,
|
||||||
|
},
|
||||||
|
CreateTuple {
|
||||||
|
space: String,
|
||||||
|
tuple_id: String,
|
||||||
|
value: String,
|
||||||
|
},
|
||||||
|
CreateSpace {
|
||||||
|
space_name: String,
|
||||||
|
},
|
||||||
|
Read {
|
||||||
|
collection: String,
|
||||||
|
key: String,
|
||||||
|
},
|
||||||
|
Update {
|
||||||
|
collection: String,
|
||||||
|
key: String,
|
||||||
|
value: String,
|
||||||
|
},
|
||||||
|
Delete {
|
||||||
|
collection: String,
|
||||||
|
key: String,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// ACL operations
|
||||||
|
Login {
|
||||||
|
username: String,
|
||||||
|
},
|
||||||
|
AddUser {
|
||||||
|
username: String,
|
||||||
|
permissions: String,
|
||||||
|
},
|
||||||
|
RemoveUser {
|
||||||
|
username: String,
|
||||||
|
},
|
||||||
|
ListUsers,
|
||||||
|
|
||||||
|
/// Transaction operations
|
||||||
|
BeginTx,
|
||||||
|
CommitTx {
|
||||||
|
tx_id: String,
|
||||||
|
},
|
||||||
|
RollbackTx {
|
||||||
|
tx_id: String,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// Raft cluster operations
|
||||||
|
ClusterStatus,
|
||||||
|
AddNode {
|
||||||
|
node_url: String,
|
||||||
|
},
|
||||||
|
RemoveNode {
|
||||||
|
node_id: String,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// Replication operations
|
||||||
|
EnableReplication,
|
||||||
|
DisableReplication,
|
||||||
|
ReplicationStatus,
|
||||||
|
}
|
105
src/lua_interpreter.rs
Normal file
105
src/lua_interpreter.rs
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
// src/lua_interpreter.rs
|
||||||
|
use crate::{FutrumError, Result};
|
||||||
|
use rlua::{Lua, Function};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::RwLock;
|
||||||
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
|
pub struct LuaInterpreter {
|
||||||
|
lua: Lua,
|
||||||
|
scripts_dir: String,
|
||||||
|
loaded_scripts: Arc<RwLock<Vec<String>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LuaInterpreter {
|
||||||
|
pub fn new(scripts_dir: String) -> Result<Self> {
|
||||||
|
let lua = Lua::new();
|
||||||
|
|
||||||
|
let interpreter = Self {
|
||||||
|
lua,
|
||||||
|
scripts_dir: scripts_dir.clone(),
|
||||||
|
loaded_scripts: Arc::new(RwLock::new(Vec::new())),
|
||||||
|
};
|
||||||
|
|
||||||
|
interpreter.load_scripts()?;
|
||||||
|
Ok(interpreter)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn load_scripts(&self) -> Result<()> {
|
||||||
|
let scripts_dir = self.scripts_dir.clone();
|
||||||
|
|
||||||
|
// Загружаем скрипты без использования scope
|
||||||
|
let lua = &self.lua;
|
||||||
|
|
||||||
|
// Создаем директорию для скриптов, если она не существует
|
||||||
|
let _ = std::fs::create_dir_all(&scripts_dir);
|
||||||
|
|
||||||
|
// Загружаем все Lua скрипты из директории
|
||||||
|
for entry in WalkDir::new(&scripts_dir)
|
||||||
|
.follow_links(true)
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|e| e.ok())
|
||||||
|
{
|
||||||
|
if entry.path().extension().map_or(false, |ext| ext == "lua") {
|
||||||
|
if let Ok(script_content) = std::fs::read_to_string(entry.path()) {
|
||||||
|
if let Err(e) = lua.load(&script_content).exec() {
|
||||||
|
eprintln!("Error loading script {}: {}", entry.path().display(), e);
|
||||||
|
} else {
|
||||||
|
println!("Loaded script: {}", entry.path().display());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Регистрируем глобальные функции
|
||||||
|
let globals = lua.globals();
|
||||||
|
|
||||||
|
// Функция для вывода в консоль
|
||||||
|
let print_fn = lua.create_function(|_, msg: String| {
|
||||||
|
println!("[LUA] {}", msg);
|
||||||
|
Ok(())
|
||||||
|
}).map_err(|e| FutrumError::LuaError(e.to_string()))?;
|
||||||
|
globals.set("print", print_fn)
|
||||||
|
.map_err(|e| FutrumError::LuaError(e.to_string()))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn execute_code(&self, code: &str) -> Result<String> {
|
||||||
|
let result = self.lua.load(code).eval::<String>()
|
||||||
|
.map_err(|e| FutrumError::LuaError(e.to_string()))?;
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Упрощенная версия регистрации функций без сложных времен жизни
|
||||||
|
pub fn register_simple_function<F>(&self, name: &str, func: F) -> Result<()>
|
||||||
|
where
|
||||||
|
F: Fn(String) -> String + 'static,
|
||||||
|
{
|
||||||
|
let lua_func = self.lua.create_function(move |_, arg: String| {
|
||||||
|
Ok(func(arg))
|
||||||
|
}).map_err(|e| FutrumError::LuaError(e.to_string()))?;
|
||||||
|
|
||||||
|
let globals = self.lua.globals();
|
||||||
|
globals.set(name, lua_func)
|
||||||
|
.map_err(|e| FutrumError::LuaError(e.to_string()))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Альтернативный метод для регистрации функций с обработкой ошибок
|
||||||
|
pub fn register_function_with_error<F>(&self, name: &str, func: F) -> Result<()>
|
||||||
|
where
|
||||||
|
F: Fn(String) -> rlua::Result<String> + 'static,
|
||||||
|
{
|
||||||
|
let lua_func = self.lua.create_function(move |_, arg: String| {
|
||||||
|
func(arg)
|
||||||
|
}).map_err(|e| FutrumError::LuaError(e.to_string()))?;
|
||||||
|
|
||||||
|
let globals = self.lua.globals();
|
||||||
|
globals.set(name, lua_func)
|
||||||
|
.map_err(|e| FutrumError::LuaError(e.to_string()))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
374
src/main.rs
Normal file
374
src/main.rs
Normal file
@ -0,0 +1,374 @@
|
|||||||
|
// src/main.rs
|
||||||
|
use ansi_term::Colour::{RGB, White, Green, Yellow, Red};
|
||||||
|
use std::env;
|
||||||
|
use std::process;
|
||||||
|
use std::fs::OpenOptions;
|
||||||
|
use simplelog::*;
|
||||||
|
use std::io::{self, Write};
|
||||||
|
use time::macros::format_description;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
use futriix::{FutrumDB, CliCommand, application::ApplicationServer, lua_interpreter::LuaInterpreter};
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
// Инициализация логирования ДО любого вывода
|
||||||
|
let log_file = OpenOptions::new()
|
||||||
|
.create(true)
|
||||||
|
.append(true)
|
||||||
|
.open("futrum.log")
|
||||||
|
.expect("Failed to open log file");
|
||||||
|
|
||||||
|
let time_format = format_description!("[year]-[month]-[day] [hour]:[minute]:[second]");
|
||||||
|
let config = ConfigBuilder::new()
|
||||||
|
.set_time_format_custom(time_format)
|
||||||
|
.set_time_level(LevelFilter::Debug)
|
||||||
|
.set_location_level(LevelFilter::Debug)
|
||||||
|
.set_target_level(LevelFilter::Trace)
|
||||||
|
.add_filter_ignore("hyper".to_string())
|
||||||
|
.add_filter_ignore("tokio".to_string())
|
||||||
|
.add_filter_ignore("mio".to_string())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
CombinedLogger::init(vec![
|
||||||
|
TermLogger::new(
|
||||||
|
LevelFilter::Info,
|
||||||
|
config.clone(),
|
||||||
|
TerminalMode::Mixed,
|
||||||
|
ColorChoice::AlwaysAnsi, // Используем ANSI цвета
|
||||||
|
),
|
||||||
|
WriteLogger::new(LevelFilter::Debug, config, log_file),
|
||||||
|
]).expect("Failed to initialize logger");
|
||||||
|
|
||||||
|
// Логируем запуск приложения
|
||||||
|
log::info!("Starting futriiX application-server");
|
||||||
|
|
||||||
|
// Заголовок приложения
|
||||||
|
println!();
|
||||||
|
println!("{}", RGB(0, 191, 255).paint("futriiX application-server"));
|
||||||
|
println!();
|
||||||
|
|
||||||
|
// Запуск сервера приложений
|
||||||
|
if let Err(e) = run_application_server().await {
|
||||||
|
log::error!("Application server error: {}", e);
|
||||||
|
eprintln!("{}", Red.paint(&format!("Application server error: {}", e)));
|
||||||
|
process::exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
log::info!("futriiX application-server stopped");
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_application_server() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
log::info!("Initializing application server components");
|
||||||
|
|
||||||
|
// Инициализация компонентов
|
||||||
|
let app_server = ApplicationServer::new(
|
||||||
|
"./static".to_string(),
|
||||||
|
vec!["pdf".to_string(), "txt".to_string(), "html".to_string(), "css".to_string(),
|
||||||
|
"md".to_string(), "mp3".to_string(), "mp4".to_string(), "js".to_string(), "lua".to_string()],
|
||||||
|
10,
|
||||||
|
);
|
||||||
|
|
||||||
|
log::info!("Creating Lua interpreter");
|
||||||
|
let lua_interpreter = LuaInterpreter::new("./lua-scripts".to_string())?;
|
||||||
|
|
||||||
|
// Запуск HTTP сервера в фоне
|
||||||
|
log::info!("Starting HTTP server");
|
||||||
|
let app_server_clone = app_server.clone();
|
||||||
|
let http_handle = tokio::spawn(async move {
|
||||||
|
log::info!("HTTP server background task started");
|
||||||
|
if let Err(e) = app_server_clone.start_http_server().await {
|
||||||
|
log::error!("HTTP server error: {}", e);
|
||||||
|
eprintln!("{}", Red.paint(&format!("HTTP server error: {}", e)));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Даем время HTTP серверу запуститься
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
// Основной интерактивный цикл
|
||||||
|
log::info!("Starting interactive mode");
|
||||||
|
run_interactive_mode(lua_interpreter).await?;
|
||||||
|
|
||||||
|
// Останавливаем HTTP сервер
|
||||||
|
log::info!("Stopping HTTP server");
|
||||||
|
http_handle.abort();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_interactive_mode(lua_interpreter: LuaInterpreter) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let mut current_mode = Mode::Lua;
|
||||||
|
let mut command_history = Vec::new();
|
||||||
|
|
||||||
|
println!("{}", White.paint("Type 'inbox.start' to enter database mode"));
|
||||||
|
log::info!("Interactive mode started, waiting for user input");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match current_mode {
|
||||||
|
Mode::Lua => {
|
||||||
|
// Ярко-зеленый цвет для приглашения lua>
|
||||||
|
print!("{} ", RGB(0, 255, 0).paint("lua>"));
|
||||||
|
io::stdout().flush().unwrap();
|
||||||
|
|
||||||
|
let mut input = String::new();
|
||||||
|
if io::stdin().read_line(&mut input).is_err() {
|
||||||
|
log::warn!("Failed to read input in Lua mode");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let input = input.trim();
|
||||||
|
if input.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
command_history.push(input.to_string());
|
||||||
|
log::info!("Lua command received: {}", input);
|
||||||
|
|
||||||
|
match input {
|
||||||
|
"exit" => {
|
||||||
|
log::info!("Exit command received, shutting down");
|
||||||
|
println!("{}", White.paint("Shutting down futriiX application server..."));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
"inbox.start" => {
|
||||||
|
log::info!("Switching to database mode");
|
||||||
|
println!(); // Пустая строка перед сообщением
|
||||||
|
println!("{}", RGB(0, 191, 255).paint("Switching to database mode..."));
|
||||||
|
current_mode = Mode::Database;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
log::debug!("Executing Lua code: {}", input);
|
||||||
|
match lua_interpreter.execute_code(input) {
|
||||||
|
Ok(result) if !result.is_empty() => {
|
||||||
|
println!("{}", result);
|
||||||
|
log::debug!("Lua execution result: {}", result);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
println!("{}", Red.paint(&format!("Lua error: {}", e)));
|
||||||
|
log::error!("Lua execution error: {}", e);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
log::debug!("Lua execution completed with empty result");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Mode::Database => {
|
||||||
|
log::info!("Initializing database mode");
|
||||||
|
let db = match FutrumDB::new("config.toml") {
|
||||||
|
Ok(db) => {
|
||||||
|
log::info!("Database initialized successfully");
|
||||||
|
db
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Database initialization failed: {}", e);
|
||||||
|
println!("{}", Red.paint(&format!("Database error: {}", e)));
|
||||||
|
current_mode = Mode::Lua;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let (command_tx, command_rx) = mpsc::channel(100);
|
||||||
|
let db_handle = {
|
||||||
|
let db = db.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
log::info!("Database background task started");
|
||||||
|
if let Err(e) = db.run_silent(command_rx).await {
|
||||||
|
log::error!("Database background error: {}", e);
|
||||||
|
eprintln!("Database error: {}", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
// Белый цвет для сообщения и пустая строка после
|
||||||
|
println!("{}", White.paint("Database mode activated. Type 'exit' to return to Lua mode."));
|
||||||
|
println!();
|
||||||
|
log::info!("Database mode activated");
|
||||||
|
|
||||||
|
// Цикл режима базы данных
|
||||||
|
loop {
|
||||||
|
print!("{}", RGB(0, 191, 255).paint("futriiX:~> "));
|
||||||
|
io::stdout().flush().unwrap();
|
||||||
|
|
||||||
|
let mut input = String::new();
|
||||||
|
if io::stdin().read_line(&mut input).is_err() {
|
||||||
|
log::warn!("Failed to read input in database mode");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let input = input.trim();
|
||||||
|
if input.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
command_history.push(input.to_string());
|
||||||
|
log::info!("Database command received: {}", input);
|
||||||
|
|
||||||
|
if input == "exit" {
|
||||||
|
log::info!("Exiting database mode");
|
||||||
|
let _ = command_tx.send(futriix::Command::Shutdown).await;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let result = match parse_database_command(input) {
|
||||||
|
Ok(command) => {
|
||||||
|
log::debug!("Executing database command: {:?}", command);
|
||||||
|
db.execute_command(command).await
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("Invalid database command: {}", e);
|
||||||
|
Err(futriix::FutrumError::ConfigError(e))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(output) => {
|
||||||
|
// Используем заимствование для output чтобы избежать перемещения
|
||||||
|
println!("{}", Green.paint(&output));
|
||||||
|
log::debug!("Database command successful: {}", output);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
println!("{}", Red.paint(format!("Error: {}", e)));
|
||||||
|
log::error!("Database command failed: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
|
||||||
|
log::info!("Stopping database background task");
|
||||||
|
db_handle.abort();
|
||||||
|
|
||||||
|
println!("{}", RGB(0, 191, 255).paint("Returning to Lua mode..."));
|
||||||
|
current_mode = Mode::Lua;
|
||||||
|
log::info!("Returned to Lua mode");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log::info!("Interactive mode ended");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
enum Mode {
|
||||||
|
Lua,
|
||||||
|
Database,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_database_command(input: &str) -> Result<CliCommand, String> {
|
||||||
|
let parts: Vec<&str> = input.split_whitespace().collect();
|
||||||
|
if parts.is_empty() {
|
||||||
|
return Err("Empty command".to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
match parts[0] {
|
||||||
|
"create" => {
|
||||||
|
if parts.len() < 4 {
|
||||||
|
return Err("Usage: create <collection> <key> <json_value>".to_string());
|
||||||
|
}
|
||||||
|
Ok(CliCommand::Create {
|
||||||
|
collection: parts[1].to_string(),
|
||||||
|
key: parts[2].to_string(),
|
||||||
|
value: parts[3..].join(" "),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"create.tuple" => {
|
||||||
|
if parts.len() < 4 {
|
||||||
|
return Err("Usage: create.tuple <collection> <key> <json_value>".to_string());
|
||||||
|
}
|
||||||
|
Ok(CliCommand::Create {
|
||||||
|
collection: parts[1].to_string(),
|
||||||
|
key: parts[2].to_string(),
|
||||||
|
value: parts[3..].join(" "),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"create.space" => {
|
||||||
|
if parts.len() < 2 {
|
||||||
|
return Err("Usage: create.space <collection_name>".to_string());
|
||||||
|
}
|
||||||
|
// Создаем пустую коллекцию (пространство) с тестовым документом
|
||||||
|
Ok(CliCommand::Create {
|
||||||
|
collection: parts[1].to_string(),
|
||||||
|
key: "_init".to_string(),
|
||||||
|
value: "{\"type\": \"space\", \"created\": true}".to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"read" => {
|
||||||
|
if parts.len() < 3 {
|
||||||
|
return Err("Usage: read <collection> <key>".to_string());
|
||||||
|
}
|
||||||
|
Ok(CliCommand::Read {
|
||||||
|
collection: parts[1].to_string(),
|
||||||
|
key: parts[2].to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"update" => {
|
||||||
|
if parts.len() < 4 {
|
||||||
|
return Err("Usage: update <collection> <key> <json_value>".to_string());
|
||||||
|
}
|
||||||
|
Ok(CliCommand::Update {
|
||||||
|
collection: parts[1].to_string(),
|
||||||
|
key: parts[2].to_string(),
|
||||||
|
value: parts[3..].join(" "),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"delete" => {
|
||||||
|
if parts.len() < 3 {
|
||||||
|
return Err("Usage: delete <collection> <key>".to_string());
|
||||||
|
}
|
||||||
|
Ok(CliCommand::Delete {
|
||||||
|
collection: parts[1].to_string(),
|
||||||
|
key: parts[2].to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"status" | "cluster-status" => Ok(CliCommand::ClusterStatus),
|
||||||
|
"login" => {
|
||||||
|
if parts.len() < 2 {
|
||||||
|
return Err("Usage: login <username>".to_string());
|
||||||
|
}
|
||||||
|
Ok(CliCommand::Login {
|
||||||
|
username: parts[1].to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"add-user" => {
|
||||||
|
if parts.len() < 3 {
|
||||||
|
return Err("Usage: add-user <username> <permissions>".to_string());
|
||||||
|
}
|
||||||
|
Ok(CliCommand::AddUser {
|
||||||
|
username: parts[1].to_string(),
|
||||||
|
permissions: parts[2].to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"remove-user" => {
|
||||||
|
if parts.len() < 2 {
|
||||||
|
return Err("Usage: remove-user <username>".to_string());
|
||||||
|
}
|
||||||
|
Ok(CliCommand::RemoveUser {
|
||||||
|
username: parts[1].to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"list-users" => Ok(CliCommand::ListUsers),
|
||||||
|
"add-node" => {
|
||||||
|
if parts.len() < 2 {
|
||||||
|
return Err("Usage: add-node <node_url>".to_string());
|
||||||
|
}
|
||||||
|
Ok(CliCommand::AddNode {
|
||||||
|
node_url: parts[1].to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"remove-node" => {
|
||||||
|
if parts.len() < 2 {
|
||||||
|
return Err("Usage: remove-node <node_id>".to_string());
|
||||||
|
}
|
||||||
|
Ok(CliCommand::RemoveNode {
|
||||||
|
node_id: parts[1].to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
_ => Err(format!("Unknown command: {}", parts[0])),
|
||||||
|
}
|
||||||
|
}
|
3
src/network/mod.rs
Normal file
3
src/network/mod.rs
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
pub mod server;
|
||||||
|
|
||||||
|
pub use server::Server;
|
81
src/network/server.rs
Normal file
81
src/network/server.rs
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
use crate::storage::StorageEngine;
|
||||||
|
use crate::consensus::RaftConsensus;
|
||||||
|
use crate::command_history::CommandHistory;
|
||||||
|
use crate::FutrumError;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::RwLock;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
pub struct Server {
|
||||||
|
storage: Arc<StorageEngine>,
|
||||||
|
consensus: Arc<RaftConsensus>,
|
||||||
|
command_history: Arc<RwLock<CommandHistory>>,
|
||||||
|
address: SocketAddr,
|
||||||
|
should_stop: Arc<RwLock<bool>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Server {
|
||||||
|
pub fn new(
|
||||||
|
storage: Arc<StorageEngine>,
|
||||||
|
consensus: Arc<RaftConsensus>,
|
||||||
|
command_history: Arc<RwLock<CommandHistory>>,
|
||||||
|
host: String,
|
||||||
|
port: u16,
|
||||||
|
) -> Result<Self, FutrumError> {
|
||||||
|
let address: SocketAddr = format!("{}:{}", host, port).parse()
|
||||||
|
.map_err(|e: std::net::AddrParseError| FutrumError::NetworkError(e.to_string()))?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
storage,
|
||||||
|
consensus,
|
||||||
|
command_history,
|
||||||
|
address,
|
||||||
|
should_stop: Arc::new(RwLock::new(false)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Тихая версия сервера - БЕЗ ВЫВОДА В КОНСОЛЬ
|
||||||
|
pub async fn run_silent(&self) -> Result<(), FutrumError> {
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
|
||||||
|
let listener = TcpListener::bind(self.address).await
|
||||||
|
.map_err(|e: std::io::Error| FutrumError::NetworkError(e.to_string()))?;
|
||||||
|
|
||||||
|
let should_stop = self.should_stop.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while !*should_stop.read().await {
|
||||||
|
match listener.accept().await {
|
||||||
|
Ok((socket, _addr)) => {
|
||||||
|
// Минимальная обработка соединения
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let _ = socket;
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn stop(&self) {
|
||||||
|
*self.should_stop.write().await = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for Server {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
storage: self.storage.clone(),
|
||||||
|
consensus: self.consensus.clone(),
|
||||||
|
command_history: self.command_history.clone(),
|
||||||
|
address: self.address,
|
||||||
|
should_stop: self.should_stop.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
100
src/replication.rs
Normal file
100
src/replication.rs
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
// src/replication.rs
|
||||||
|
use crate::{FutrumError, Result};
|
||||||
|
use dashmap::DashMap;
|
||||||
|
use serde_json::Value;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ReplicationRecord {
|
||||||
|
pub key: String,
|
||||||
|
pub value: Value,
|
||||||
|
pub timestamp: DateTime<Utc>,
|
||||||
|
pub node_id: String,
|
||||||
|
pub version: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct MultiMasterReplication {
|
||||||
|
enabled: bool,
|
||||||
|
node_id: String,
|
||||||
|
peers: Vec<String>,
|
||||||
|
pending_replication: Arc<DashMap<String, ReplicationRecord>>,
|
||||||
|
sync_interval: std::time::Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MultiMasterReplication {
|
||||||
|
pub fn new(
|
||||||
|
enabled: bool,
|
||||||
|
node_id: String,
|
||||||
|
peers: Vec<String>,
|
||||||
|
sync_interval_ms: u64
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
enabled,
|
||||||
|
node_id,
|
||||||
|
peers,
|
||||||
|
pending_replication: Arc::new(DashMap::new()),
|
||||||
|
sync_interval: std::time::Duration::from_millis(sync_interval_ms),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn queue_replication(
|
||||||
|
&self,
|
||||||
|
collection: &str,
|
||||||
|
key: &str,
|
||||||
|
value: Value
|
||||||
|
) -> Result<()> {
|
||||||
|
if !self.enabled {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let record = ReplicationRecord {
|
||||||
|
key: format!("{}:{}", collection, key),
|
||||||
|
value,
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
node_id: self.node_id.clone(),
|
||||||
|
version: 1,
|
||||||
|
};
|
||||||
|
|
||||||
|
self.pending_replication.insert(record.key.clone(), record);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start_replication_worker(&self) -> Result<()> {
|
||||||
|
if !self.enabled {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let pending = self.pending_replication.clone();
|
||||||
|
let peers = self.peers.clone();
|
||||||
|
let sync_interval = self.sync_interval;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
tokio::time::sleep(sync_interval).await;
|
||||||
|
|
||||||
|
// Упрощенная имитация репликации
|
||||||
|
for entry in pending.iter() {
|
||||||
|
let record = entry.value();
|
||||||
|
// Здесь была бы реальная отправка на пиры
|
||||||
|
println!("Replicating {} to {} peers", record.key, peers.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Очищаем после "репликации"
|
||||||
|
pending.clear();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn apply_replication(&self, record: ReplicationRecord) -> Result<()> {
|
||||||
|
if !self.enabled {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Применяем реплицированные изменения
|
||||||
|
println!("Applied replication from {}: {}", record.node_id, record.key);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
74
src/storage/document.rs
Normal file
74
src/storage/document.rs
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
pub struct Document {
|
||||||
|
pub id: String,
|
||||||
|
pub collection: String,
|
||||||
|
pub data: serde_json::Value,
|
||||||
|
pub created_at: DateTime<Utc>,
|
||||||
|
pub updated_at: DateTime<Utc>,
|
||||||
|
pub version: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Document {
|
||||||
|
pub fn new(collection: &str, id: &str, data: serde_json::Value) -> Self {
|
||||||
|
let now = Utc::now();
|
||||||
|
Self {
|
||||||
|
id: id.to_string(),
|
||||||
|
collection: collection.to_string(),
|
||||||
|
data,
|
||||||
|
created_at: now,
|
||||||
|
updated_at: now,
|
||||||
|
version: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update(&mut self, data: serde_json::Value) {
|
||||||
|
self.data = data;
|
||||||
|
self.updated_at = Utc::now();
|
||||||
|
self.version += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct DocumentCollection {
|
||||||
|
documents: HashMap<String, Document>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DocumentCollection {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
documents: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert(&mut self, document: Document) -> Option<Document> {
|
||||||
|
self.documents.insert(document.id.clone(), document)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, id: &str) -> Option<&Document> {
|
||||||
|
self.documents.get(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_mut(&mut self, id: &str) -> Option<&mut Document> {
|
||||||
|
self.documents.get_mut(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove(&mut self, id: &str) -> Option<Document> {
|
||||||
|
self.documents.remove(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn contains(&self, id: &str) -> bool {
|
||||||
|
self.documents.contains_key(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.documents.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.documents.is_empty()
|
||||||
|
}
|
||||||
|
}
|
149
src/storage/engine.rs
Normal file
149
src/storage/engine.rs
Normal file
@ -0,0 +1,149 @@
|
|||||||
|
// src/storage/engine.rs
|
||||||
|
use super::document::{Document, DocumentCollection};
|
||||||
|
use super::wal::WriteAheadLog;
|
||||||
|
use crate::FutrumError;
|
||||||
|
use dashmap::DashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
|
pub struct StorageEngine {
|
||||||
|
collections: Arc<DashMap<String, DocumentCollection>>,
|
||||||
|
wal: RwLock<WriteAheadLog>,
|
||||||
|
wal_enabled: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StorageEngine {
|
||||||
|
pub fn new() -> Result<Self, FutrumError> {
|
||||||
|
let wal = WriteAheadLog::new("futrum.wal")
|
||||||
|
.map_err(|e| FutrumError::StorageError(e.to_string()))?;
|
||||||
|
|
||||||
|
log::info!("Write-Ahead Log initialized at futrum.wal");
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
collections: Arc::new(DashMap::new()),
|
||||||
|
wal: RwLock::new(wal),
|
||||||
|
wal_enabled: true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create(&self, collection: &str, key: &str, value: serde_json::Value) -> Result<(), FutrumError> {
|
||||||
|
let value_clone = value.clone();
|
||||||
|
let document = Document::new(collection, key, value);
|
||||||
|
|
||||||
|
let mut collection_map = self.collections
|
||||||
|
.entry(collection.to_string())
|
||||||
|
.or_insert_with(DocumentCollection::new);
|
||||||
|
|
||||||
|
if collection_map.contains(key) {
|
||||||
|
return Err(FutrumError::StorageError("Key already exists".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
collection_map.insert(document);
|
||||||
|
|
||||||
|
// Write to WAL
|
||||||
|
if self.wal_enabled {
|
||||||
|
let entry = super::wal::WALEntry::Create {
|
||||||
|
collection: collection.to_string(),
|
||||||
|
key: key.to_string(),
|
||||||
|
value: value_clone,
|
||||||
|
timestamp: chrono::Utc::now().timestamp_millis() as u64,
|
||||||
|
};
|
||||||
|
|
||||||
|
log::debug!("Writing to WAL: CREATE operation for {}/{}", collection, key);
|
||||||
|
self.wal.write().await.write(&entry)
|
||||||
|
.map_err(|e| FutrumError::StorageError(e.to_string()))?;
|
||||||
|
} else {
|
||||||
|
log::warn!("WAL is disabled, operation not logged: CREATE {}/{}", collection, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn read(&self, collection: &str, key: &str) -> Result<Option<serde_json::Value>, FutrumError> {
|
||||||
|
let collection_map = self.collections.get(collection);
|
||||||
|
|
||||||
|
match collection_map {
|
||||||
|
Some(col) => {
|
||||||
|
Ok(col.get(key).map(|doc| doc.data.clone()))
|
||||||
|
}
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update(&self, collection: &str, key: &str, value: serde_json::Value) -> Result<(), FutrumError> {
|
||||||
|
let value_clone = value.clone();
|
||||||
|
|
||||||
|
let mut collection_map = self.collections
|
||||||
|
.entry(collection.to_string())
|
||||||
|
.or_insert_with(DocumentCollection::new);
|
||||||
|
|
||||||
|
match collection_map.get_mut(key) {
|
||||||
|
Some(document) => {
|
||||||
|
document.update(value);
|
||||||
|
|
||||||
|
// Write to WAL
|
||||||
|
if self.wal_enabled {
|
||||||
|
let entry = super::wal::WALEntry::Update {
|
||||||
|
collection: collection.to_string(),
|
||||||
|
key: key.to_string(),
|
||||||
|
value: value_clone,
|
||||||
|
timestamp: chrono::Utc::now().timestamp_millis() as u64,
|
||||||
|
};
|
||||||
|
|
||||||
|
log::debug!("Writing to WAL: UPDATE operation for {}/{}", collection, key);
|
||||||
|
self.wal.write().await.write(&entry)
|
||||||
|
.map_err(|e| FutrumError::StorageError(e.to_string()))?;
|
||||||
|
} else {
|
||||||
|
log::warn!("WAL is disabled, operation not logged: UPDATE {}/{}", collection, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
None => Err(FutrumError::KeyNotFound(key.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn delete(&self, collection: &str, key: &str) -> Result<(), FutrumError> {
|
||||||
|
let mut collection_map = self.collections
|
||||||
|
.entry(collection.to_string())
|
||||||
|
.or_insert_with(DocumentCollection::new);
|
||||||
|
|
||||||
|
if collection_map.remove(key).is_some() {
|
||||||
|
// Write to WAL
|
||||||
|
if self.wal_enabled {
|
||||||
|
let entry = super::wal::WALEntry::Delete {
|
||||||
|
collection: collection.to_string(),
|
||||||
|
key: key.to_string(),
|
||||||
|
timestamp: chrono::Utc::now().timestamp_millis() as u64,
|
||||||
|
};
|
||||||
|
|
||||||
|
log::debug!("Writing to WAL: DELETE operation for {}/{}", collection, key);
|
||||||
|
self.wal.write().await.write(&entry)
|
||||||
|
.map_err(|e| FutrumError::StorageError(e.to_string()))?;
|
||||||
|
} else {
|
||||||
|
log::warn!("WAL is disabled, operation not logged: DELETE {}/{}", collection, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(FutrumError::KeyNotFound(key.to_string()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list_collections(&self) -> Vec<String> {
|
||||||
|
self.collections.iter().map(|entry| entry.key().clone()).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn collection_exists(&self, collection: &str) -> bool {
|
||||||
|
self.collections.contains_key(collection)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn enable_wal(&mut self, enabled: bool) {
|
||||||
|
self.wal_enabled = enabled;
|
||||||
|
log::info!("WAL {}", if enabled { "enabled" } else { "disabled" });
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_wal_enabled(&self) -> bool {
|
||||||
|
self.wal_enabled
|
||||||
|
}
|
||||||
|
}
|
7
src/storage/mod.rs
Normal file
7
src/storage/mod.rs
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
pub mod document;
|
||||||
|
pub mod wal;
|
||||||
|
pub mod engine;
|
||||||
|
|
||||||
|
pub use document::Document;
|
||||||
|
pub use wal::WriteAheadLog;
|
||||||
|
pub use engine::StorageEngine;
|
114
src/storage/wal.rs
Normal file
114
src/storage/wal.rs
Normal file
@ -0,0 +1,114 @@
|
|||||||
|
// src/storage/wal.rs
|
||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
use std::fs::{File, OpenOptions};
|
||||||
|
use std::io::{BufWriter, Write, BufReader, Seek, SeekFrom, BufRead};
|
||||||
|
use std::path::Path;
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
#[derive(Error, Debug)]
|
||||||
|
pub enum WALError {
|
||||||
|
#[error("IO error: {0}")]
|
||||||
|
IOError(#[from] std::io::Error),
|
||||||
|
#[error("Serialization error: {0}")]
|
||||||
|
SerializationError(#[from] rmp_serde::encode::Error),
|
||||||
|
#[error("Deserialization error: {0}")]
|
||||||
|
DeserializationError(#[from] rmp_serde::decode::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
pub enum WALEntry {
|
||||||
|
Create {
|
||||||
|
collection: String,
|
||||||
|
key: String,
|
||||||
|
value: serde_json::Value,
|
||||||
|
timestamp: u64,
|
||||||
|
},
|
||||||
|
Update {
|
||||||
|
collection: String,
|
||||||
|
key: String,
|
||||||
|
value: serde_json::Value,
|
||||||
|
timestamp: u64,
|
||||||
|
},
|
||||||
|
Delete {
|
||||||
|
collection: String,
|
||||||
|
key: String,
|
||||||
|
timestamp: u64,
|
||||||
|
},
|
||||||
|
Commit {
|
||||||
|
transaction_id: String,
|
||||||
|
timestamp: u64,
|
||||||
|
},
|
||||||
|
Rollback {
|
||||||
|
transaction_id: String,
|
||||||
|
timestamp: u64,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WALEntry {
|
||||||
|
pub fn timestamp(&self) -> u64 {
|
||||||
|
match self {
|
||||||
|
WALEntry::Create { timestamp, .. } => *timestamp,
|
||||||
|
WALEntry::Update { timestamp, .. } => *timestamp,
|
||||||
|
WALEntry::Delete { timestamp, .. } => *timestamp,
|
||||||
|
WALEntry::Commit { timestamp, .. } => *timestamp,
|
||||||
|
WALEntry::Rollback { timestamp, .. } => *timestamp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct WriteAheadLog {
|
||||||
|
file: BufWriter<File>,
|
||||||
|
path: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WriteAheadLog {
|
||||||
|
pub fn new(path: &str) -> Result<Self, WALError> {
|
||||||
|
let file = OpenOptions::new()
|
||||||
|
.create(true)
|
||||||
|
.append(true)
|
||||||
|
.open(path)?;
|
||||||
|
|
||||||
|
log::info!("WAL file created/opened: {}", path);
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
file: BufWriter::new(file),
|
||||||
|
path: path.to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write(&mut self, entry: &WALEntry) -> Result<(), WALError> {
|
||||||
|
let serialized = rmp_serde::to_vec(entry)?;
|
||||||
|
self.file.write_all(&serialized)?;
|
||||||
|
self.file.write_all(b"\n")?;
|
||||||
|
self.file.flush()?;
|
||||||
|
log::trace!("WAL entry written: {:?}", entry);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn replay(&self) -> Result<Vec<WALEntry>, WALError> {
|
||||||
|
let file = File::open(&self.path)?;
|
||||||
|
let reader = BufReader::new(file);
|
||||||
|
let mut entries = Vec::new();
|
||||||
|
|
||||||
|
for line in reader.lines() {
|
||||||
|
let line = line?;
|
||||||
|
if let Ok(entry) = rmp_serde::from_slice(line.as_bytes()) {
|
||||||
|
entries.push(entry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log::info!("Replayed {} WAL entries", entries.len());
|
||||||
|
Ok(entries)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn clear(&mut self) -> Result<(), WALError> {
|
||||||
|
self.file.seek(SeekFrom::Start(0))?;
|
||||||
|
self.file.get_mut().set_len(0)?;
|
||||||
|
log::info!("WAL cleared");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_path(&self) -> &str {
|
||||||
|
&self.path
|
||||||
|
}
|
||||||
|
}
|
115
src/transaction/manager.rs
Normal file
115
src/transaction/manager.rs
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
use crate::storage::StorageEngine;
|
||||||
|
use crate::FutrumError;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
pub struct Transaction {
|
||||||
|
pub id: String,
|
||||||
|
pub operations: Vec<TransactionOperation>,
|
||||||
|
pub state: TransactionState,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum TransactionOperation {
|
||||||
|
Create {
|
||||||
|
collection: String,
|
||||||
|
key: String,
|
||||||
|
value: serde_json::Value,
|
||||||
|
},
|
||||||
|
Update {
|
||||||
|
collection: String,
|
||||||
|
key: String,
|
||||||
|
value: serde_json::Value,
|
||||||
|
},
|
||||||
|
Delete {
|
||||||
|
collection: String,
|
||||||
|
key: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq)]
|
||||||
|
pub enum TransactionState {
|
||||||
|
Active,
|
||||||
|
Committed,
|
||||||
|
RolledBack,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct TransactionManager {
|
||||||
|
storage: StorageEngine,
|
||||||
|
active_transactions: HashMap<String, Transaction>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TransactionManager {
|
||||||
|
pub fn new(storage: StorageEngine) -> Self {
|
||||||
|
Self {
|
||||||
|
storage,
|
||||||
|
active_transactions: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn begin(&mut self) -> String {
|
||||||
|
let transaction_id = Uuid::new_v4().to_string();
|
||||||
|
let transaction = Transaction {
|
||||||
|
id: transaction_id.clone(),
|
||||||
|
operations: Vec::new(),
|
||||||
|
state: TransactionState::Active,
|
||||||
|
};
|
||||||
|
|
||||||
|
self.active_transactions.insert(transaction_id.clone(), transaction);
|
||||||
|
transaction_id
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_operation(&mut self, transaction_id: &str, operation: TransactionOperation) -> Result<(), FutrumError> {
|
||||||
|
if let Some(transaction) = self.active_transactions.get_mut(transaction_id) {
|
||||||
|
if transaction.state != TransactionState::Active {
|
||||||
|
return Err(FutrumError::TransactionError("Transaction is not active".to_string()));
|
||||||
|
}
|
||||||
|
transaction.operations.push(operation);
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(FutrumError::TransactionError("Transaction not found".to_string()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn commit(&mut self, transaction_id: &str) -> Result<(), FutrumError> {
|
||||||
|
if let Some(transaction) = self.active_transactions.get_mut(transaction_id) {
|
||||||
|
if transaction.state != TransactionState::Active {
|
||||||
|
return Err(FutrumError::TransactionError("Transaction is not active".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply all operations
|
||||||
|
for operation in &transaction.operations {
|
||||||
|
match operation {
|
||||||
|
TransactionOperation::Create { collection, key, value } => {
|
||||||
|
self.storage.create(collection, key, value.clone()).await?;
|
||||||
|
}
|
||||||
|
TransactionOperation::Update { collection, key, value } => {
|
||||||
|
self.storage.update(collection, key, value.clone()).await?;
|
||||||
|
}
|
||||||
|
TransactionOperation::Delete { collection, key } => {
|
||||||
|
self.storage.delete(collection, key).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
transaction.state = TransactionState::Committed;
|
||||||
|
self.active_transactions.remove(transaction_id);
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(FutrumError::TransactionError("Transaction not found".to_string()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn rollback(&mut self, transaction_id: &str) -> Result<(), FutrumError> {
|
||||||
|
if let Some(transaction) = self.active_transactions.get_mut(transaction_id) {
|
||||||
|
if transaction.state != TransactionState::Active {
|
||||||
|
return Err(FutrumError::TransactionError("Transaction is not active".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
transaction.state = TransactionState::RolledBack;
|
||||||
|
self.active_transactions.remove(transaction_id);
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(FutrumError::TransactionError("Transaction not found".to_string()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
3
src/transaction/mod.rs
Normal file
3
src/transaction/mod.rs
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
pub mod manager;
|
||||||
|
|
||||||
|
pub use manager::TransactionManager;
|
55
tests/integration_test.rs
Normal file
55
tests/integration_test.rs
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use tempfile::TempDir;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_storage_operations() {
|
||||||
|
let db = FutrumDB::new("test.config").unwrap();
|
||||||
|
|
||||||
|
// Test CRUD operations
|
||||||
|
let value = serde_json::json!({"test": "value"});
|
||||||
|
|
||||||
|
// Create
|
||||||
|
assert!(db.create_document("test", "key1", value.clone()).await.is_ok());
|
||||||
|
|
||||||
|
// Read
|
||||||
|
let result = db.read_document("test", "key1").await.unwrap();
|
||||||
|
assert!(result.is_some());
|
||||||
|
assert_eq!(result.unwrap(), value);
|
||||||
|
|
||||||
|
// Update
|
||||||
|
let new_value = serde_json::json!({"test": "updated"});
|
||||||
|
assert!(db.update_document("test", "key1", new_value.clone()).await.is_ok());
|
||||||
|
|
||||||
|
// Delete
|
||||||
|
assert!(db.delete_document("test", "key1").await.is_ok());
|
||||||
|
assert!(db.read_document("test", "key1").await.unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_transactions() {
|
||||||
|
let db = FutrumDB::new("test.config").unwrap();
|
||||||
|
|
||||||
|
// This would test transaction functionality
|
||||||
|
// Implementation depends on TransactionManager integration
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_command_history() {
|
||||||
|
let mut history = CommandHistory::new();
|
||||||
|
|
||||||
|
history.add_entry(
|
||||||
|
"CREATE".to_string(),
|
||||||
|
serde_json::json!({"key": "test"}),
|
||||||
|
true
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(history.len(), 1);
|
||||||
|
assert!(!history.is_empty());
|
||||||
|
|
||||||
|
let recent = history.get_recent(5);
|
||||||
|
assert_eq!(recent.len(), 1);
|
||||||
|
assert_eq!(recent[0].command, "CREATE");
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user