first commit
This commit is contained in:
commit
ab02e9a2a4
2338
Cargo.lock
generated
Normal file
2338
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
6
Cargo.toml
Normal file
6
Cargo.toml
Normal file
@ -0,0 +1,6 @@
|
||||
[workspace]
|
||||
resolver = "2" # Резолвер "3" для полного соответствия edition 2024
|
||||
members = [
|
||||
"futriix",
|
||||
"clif",
|
||||
]
|
11
WHAT'S NEW.txt
Normal file
11
WHAT'S NEW.txt
Normal file
@ -0,0 +1,11 @@
|
||||
|
||||
1. Логи не удаляются при каждом запуске сервера
|
||||
2. Клиент не запускается, если сервер выключен
|
||||
3. Убраны лишние пустые строки в логах, которые отображаются в терминале при запуске сервера
|
||||
|
||||
4. Добавлена поддержка MVCC-транзакций
|
||||
5. Добавлена поддержка резервного копирования, чтобы из/в субд можно было бы выгружать/загружать данные командами: futload-загружать данные, futunload-выгружать данные.
|
||||
6. Добавлена поддержка http-API
|
||||
7. Переписана репликация с асинхронной мастер-мастер на синхронную мастер-мастер
|
||||
8. Исправлена ошибка даты времени
|
||||
9. Добавлен lua-интерпретатор в эксперементальном режиме
|
13
clif/Cargo.toml
Normal file
13
clif/Cargo.toml
Normal file
@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "clif"
|
||||
version = "0.2.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1.0", features = ["full"] }
|
||||
rmp-serde = "0.15"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
toml = "0.7"
|
||||
colored = "2.0"
|
||||
rustyline = "10.0"
|
468
clif/src/main.rs
Normal file
468
clif/src/main.rs
Normal file
@ -0,0 +1,468 @@
|
||||
// clif/main.rs
|
||||
use colored::Colorize;
|
||||
use rustyline::Editor;
|
||||
use rustyline::error::ReadlineError;
|
||||
use serde_json;
|
||||
use std::fs;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
use toml::Value;
|
||||
|
||||
mod server {
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum Command {
|
||||
Insert { key: String, value: serde_json::Value },
|
||||
Get { key: String, version: Option<u64> },
|
||||
Update { key: String, value: serde_json::Value },
|
||||
Delete { key: String },
|
||||
BeginTransaction,
|
||||
CommitTransaction(u64),
|
||||
RollbackTransaction(u64),
|
||||
CreateIndex { field: String },
|
||||
DropIndex { field: String },
|
||||
SysExec { script_name: String },
|
||||
ReplicationEnable,
|
||||
ReplicationDisable,
|
||||
ReplicationAddPeer { addr: String },
|
||||
ReplicationRemovePeer { addr: String },
|
||||
ReplicationStatus,
|
||||
BackupCreate { path: String },
|
||||
BackupRestore { path: String },
|
||||
LuaExec { code: String },
|
||||
Halt,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum Response {
|
||||
Success(Option<serde_json::Value>),
|
||||
Error(String),
|
||||
ReplicationStatus {
|
||||
enabled: bool,
|
||||
peers: Vec<String>,
|
||||
last_sync: u128,
|
||||
},
|
||||
BackupStatus { success: bool, message: String },
|
||||
LuaOutput(String),
|
||||
}
|
||||
}
|
||||
|
||||
use server::{Command, Response};
|
||||
|
||||
async fn send_command(cmd: Command) -> Result<Response, Box<dyn std::error::Error>> {
|
||||
let config_content = fs::read_to_string("futriix.config.toml")?;
|
||||
let config: Value = toml::from_str(&config_content)?;
|
||||
|
||||
let ip = config["client"]["ip"].as_str().unwrap_or("127.0.0.1");
|
||||
let port = config["client"]["port"].as_integer().unwrap_or(8080) as u16;
|
||||
let addr = format!("{}:{}", ip, port);
|
||||
|
||||
let mut stream = TcpStream::connect(&addr).await?;
|
||||
|
||||
let cmd_bytes = rmp_serde::to_vec(&cmd)?;
|
||||
let len = cmd_bytes.len() as u32;
|
||||
stream.write_all(&len.to_be_bytes()).await?;
|
||||
stream.write_all(&cmd_bytes).await?;
|
||||
|
||||
let mut len_buf = [0u8; 4];
|
||||
stream.read_exact(&mut len_buf).await?;
|
||||
let len = u32::from_be_bytes(len_buf) as usize;
|
||||
|
||||
let mut buf = vec![0u8; len];
|
||||
stream.read_exact(&mut buf).await?;
|
||||
|
||||
Ok(rmp_serde::from_slice(&buf)?)
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let config_content = fs::read_to_string("futriix.config.toml")?;
|
||||
let config: Value = toml::from_str(&config_content)?;
|
||||
|
||||
let ip = config["server"]["ip"].as_str().unwrap_or("127.0.0.1");
|
||||
let port = config["server"]["port"].as_integer().unwrap_or(8080) as u16;
|
||||
let addr = format!("{}:{}", ip, port);
|
||||
|
||||
match TcpStream::connect(&addr).await {
|
||||
Ok(_) => {
|
||||
println!("\n{}", "Futriix CLI Client".truecolor(12, 149, 193));
|
||||
println!("Connected to server at {}", addr.green());
|
||||
println!("Type 'help' for available commands\n");
|
||||
},
|
||||
Err(e) => {
|
||||
eprintln!("{}: Failed to connect to server at {}: {}",
|
||||
"Error".red(), addr, e);
|
||||
eprintln!("Please make sure futriix-server is running");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
let mut rl = Editor::<()>::new()?;
|
||||
if rl.load_history("futriix-history.txt").is_err() {
|
||||
println!("No previous history.");
|
||||
println!();
|
||||
}
|
||||
|
||||
let mut current_tx_id: Option<u64> = None;
|
||||
let mut lua_mode = false;
|
||||
|
||||
loop {
|
||||
let prompt = if lua_mode {
|
||||
"lua> ".bright_yellow()
|
||||
} else if let Some(tx_id) = current_tx_id {
|
||||
format!("futriix:tx{}:~> ", tx_id).bright_cyan()
|
||||
} else {
|
||||
"futriix:~> ".bright_cyan()
|
||||
};
|
||||
|
||||
let readline = rl.readline(&prompt);
|
||||
match readline {
|
||||
Ok(line) => {
|
||||
rl.add_history_entry(&line);
|
||||
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if lua_mode {
|
||||
if line.trim() == "exit" {
|
||||
lua_mode = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
match send_command(Command::LuaExec { code: line }).await {
|
||||
Ok(Response::LuaOutput(output)) => {
|
||||
println!("{}", output);
|
||||
},
|
||||
Ok(Response::Error(e)) => println!("{}", e.red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let parts: Vec<&str> = line.split_whitespace().collect();
|
||||
match parts[0].to_lowercase().as_str() {
|
||||
"insert" | "i" => {
|
||||
if parts.len() < 3 {
|
||||
println!("{}", "Usage: insert <key> <json_value>".red());
|
||||
continue;
|
||||
}
|
||||
let key = parts[1].to_string();
|
||||
let value_str = parts[2..].join(" ");
|
||||
match serde_json::from_str(&value_str) {
|
||||
Ok(value) => match send_command(Command::Insert { key, value }).await {
|
||||
Ok(Response::Success(_)) => {
|
||||
println!("{}", "Insert successful".bright_green());
|
||||
println!();
|
||||
},
|
||||
Ok(Response::Error(e)) => println!("{}", e.bold().red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
},
|
||||
Err(e) => println!("{}: {}", "Invalid JSON".red(), e),
|
||||
}
|
||||
},
|
||||
"get" | "g" => {
|
||||
if parts.len() < 2 {
|
||||
println!("{}", "Usage: get <key> [version]".red());
|
||||
continue;
|
||||
}
|
||||
let key = parts[1].to_string();
|
||||
let version = if parts.len() > 2 {
|
||||
parts[2].parse().ok()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
match send_command(Command::Get { key, version }).await {
|
||||
Ok(Response::Success(Some(value))) => {
|
||||
println!("{}", serde_json::to_string_pretty(&value)?);
|
||||
println!();
|
||||
},
|
||||
Ok(Response::Success(None)) => {
|
||||
println!("{}", "Key found but no value returned".yellow());
|
||||
println!();
|
||||
},
|
||||
Ok(Response::Error(e)) => println!("{}", e.bold().red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
},
|
||||
"update" | "u" => {
|
||||
if parts.len() < 3 {
|
||||
println!("{}", "Usage: update <key> <json_value>".red());
|
||||
continue;
|
||||
}
|
||||
let key = parts[1].to_string();
|
||||
let value_str = parts[2..].join(" ");
|
||||
match serde_json::from_str(&value_str) {
|
||||
Ok(value) => match send_command(Command::Update { key, value }).await {
|
||||
Ok(Response::Success(_)) => {
|
||||
println!("{}", "Update successful".bright_green());
|
||||
println!();
|
||||
},
|
||||
Ok(Response::Error(e)) => println!("{}", e.bold().red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
},
|
||||
Err(e) => println!("{}: {}", "Invalid JSON".red(), e),
|
||||
}
|
||||
},
|
||||
"delete" | "d" => {
|
||||
if parts.len() < 2 {
|
||||
println!("{}", "Usage: delete <key>".red());
|
||||
continue;
|
||||
}
|
||||
let key = parts[1].to_string();
|
||||
match send_command(Command::Delete { key }).await {
|
||||
Ok(Response::Success(_)) => {
|
||||
println!("{}", "Delete successful".bright_green());
|
||||
println!();
|
||||
},
|
||||
Ok(Response::Error(e)) => println!("{}", e.bold().red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
},
|
||||
"begin" | "transaction" | "tx" => {
|
||||
match send_command(Command::BeginTransaction).await {
|
||||
Ok(Response::Success(Some(tx_id))) => {
|
||||
if let Some(id) = tx_id.as_u64() {
|
||||
current_tx_id = Some(id);
|
||||
println!("{}", format!("Transaction {} started", id).bright_green());
|
||||
println!();
|
||||
} else {
|
||||
println!("{}", "Invalid transaction ID received".red());
|
||||
}
|
||||
},
|
||||
Ok(Response::Success(None)) => println!("{}", "No transaction ID received".red()),
|
||||
Ok(Response::Error(e)) => println!("{}", e.bold().red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
},
|
||||
"commit" => {
|
||||
if let Some(tx_id) = current_tx_id {
|
||||
match send_command(Command::CommitTransaction(tx_id)).await {
|
||||
Ok(Response::Success(_)) => {
|
||||
println!("{}", format!("Transaction {} committed", tx_id).bright_green());
|
||||
println!();
|
||||
current_tx_id = None;
|
||||
},
|
||||
Ok(Response::Error(e)) => println!("{}", e.bold().red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
} else {
|
||||
println!("{}", "No active transaction to commit".red());
|
||||
}
|
||||
},
|
||||
"rollback" => {
|
||||
if let Some(tx_id) = current_tx_id {
|
||||
match send_command(Command::RollbackTransaction(tx_id)).await {
|
||||
Ok(Response::Success(_)) => {
|
||||
println!("{}", format!("Transaction {} rolled back", tx_id).bright_green());
|
||||
println!();
|
||||
current_tx_id = None;
|
||||
},
|
||||
Ok(Response::Error(e)) => println!("{}", e.bold().red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
} else {
|
||||
println!("{}", "No active transaction to rollback".red());
|
||||
}
|
||||
},
|
||||
"createindex" => {
|
||||
if parts.len() < 2 {
|
||||
println!("{}", "Usage: createindex <field>".red());
|
||||
continue;
|
||||
}
|
||||
let field = parts[1].to_string();
|
||||
match send_command(Command::CreateIndex { field: field.clone() }).await {
|
||||
Ok(Response::Success(_)) => {
|
||||
println!("{}", format!("Index created on field '{}'", field).bright_green());
|
||||
println!();
|
||||
},
|
||||
Ok(Response::Error(e)) => println!("{}", e.bold().red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
},
|
||||
"dropindex" => {
|
||||
if parts.len() < 2 {
|
||||
println!("{}", "Usage: dropindex <field>".red());
|
||||
continue;
|
||||
}
|
||||
let field = parts[1].to_string();
|
||||
match send_command(Command::DropIndex { field }).await {
|
||||
Ok(Response::Success(_)) => {
|
||||
println!("{}", "Index dropped".bright_green());
|
||||
println!();
|
||||
},
|
||||
Ok(Response::Error(e)) => println!("{}", e.bold().red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
},
|
||||
"sysexec" => {
|
||||
if parts.len() < 2 {
|
||||
println!("{}", "Usage: sysexec <script_name>".red());
|
||||
continue;
|
||||
}
|
||||
let script_name = parts[1].to_string();
|
||||
match send_command(Command::SysExec { script_name }).await {
|
||||
Ok(Response::Success(Some(output))) => {
|
||||
println!("{}", "Script output:".bright_green());
|
||||
println!("{}", output);
|
||||
println!();
|
||||
},
|
||||
Ok(Response::Success(None)) => {
|
||||
println!("{}", "Script executed but no output".yellow());
|
||||
println!();
|
||||
},
|
||||
Ok(Response::Error(e)) => println!("{}", e.bold().red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
},
|
||||
"futexec" => {
|
||||
println!("{}", "Entering Lua interactive mode. Type 'exit' to quit.".bright_yellow());
|
||||
lua_mode = true;
|
||||
},
|
||||
"replication" => {
|
||||
if parts.len() < 2 {
|
||||
println!("{}", "Usage: replication <on|off|status|add-peer|remove-peer>".red());
|
||||
continue;
|
||||
}
|
||||
match parts[1].to_lowercase().as_str() {
|
||||
"on" => {
|
||||
match send_command(Command::ReplicationEnable).await {
|
||||
Ok(Response::Success(_)) => println!("{}", "Replication enabled".bright_green()),
|
||||
Ok(Response::Error(e)) => println!("{}", e.red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
}
|
||||
"off" => {
|
||||
match send_command(Command::ReplicationDisable).await {
|
||||
Ok(Response::Success(_)) => println!("{}", "Replication disabled".bright_green()),
|
||||
Ok(Response::Error(e)) => println!("{}", e.red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
}
|
||||
"status" => {
|
||||
match send_command(Command::ReplicationStatus).await {
|
||||
Ok(Response::ReplicationStatus { enabled, peers, last_sync }) => {
|
||||
println!("Status:");
|
||||
println!(" Enabled: {}", enabled);
|
||||
println!(" Peers: {:?}", peers);
|
||||
println!(" Last sync: {}", last_sync);
|
||||
}
|
||||
Ok(Response::Success(_)) => println!("{}", "Unexpected success response".red()),
|
||||
Ok(Response::Error(e)) => println!("{}", e.red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
}
|
||||
"add-peer" => {
|
||||
if parts.len() < 3 {
|
||||
println!("{}", "Usage: replication add-peer <addr>".red());
|
||||
continue;
|
||||
}
|
||||
match send_command(Command::ReplicationAddPeer { addr: parts[2].to_string() }).await {
|
||||
Ok(Response::Success(_)) => println!("{}", "Peer added".bright_green()),
|
||||
Ok(Response::Error(e)) => println!("{}", e.red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
}
|
||||
"remove-peer" => {
|
||||
if parts.len() < 3 {
|
||||
println!("{}", "Usage: replication remove-peer <addr>".red());
|
||||
continue;
|
||||
}
|
||||
match send_command(Command::ReplicationRemovePeer { addr: parts[2].to_string() }).await {
|
||||
Ok(Response::Success(_)) => println!("{}", "Peer removed".bright_green()),
|
||||
Ok(Response::Error(e)) => println!("{}", e.red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
}
|
||||
_ => println!("{}", "Unknown replication command".red()),
|
||||
}
|
||||
},
|
||||
"futload" => {
|
||||
if parts.len() < 2 {
|
||||
println!("{}", "Usage: futload <path_to_backup_file>".red());
|
||||
continue;
|
||||
}
|
||||
match send_command(Command::BackupRestore { path: parts[1].to_string() }).await {
|
||||
Ok(Response::BackupStatus { success, message }) => {
|
||||
if success {
|
||||
println!("{}", message.green());
|
||||
} else {
|
||||
println!("{}", message.red());
|
||||
}
|
||||
}
|
||||
Ok(Response::Success(_)) => println!("{}", "Unexpected success response".red()),
|
||||
Ok(Response::Error(e)) => println!("{}", e.red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
},
|
||||
"futunload" => {
|
||||
if parts.len() < 2 {
|
||||
println!("{}", "Usage: futunload <path_to_backup_file>".red());
|
||||
continue;
|
||||
}
|
||||
match send_command(Command::BackupCreate { path: parts[1].to_string() }).await {
|
||||
Ok(Response::BackupStatus { success, message }) => {
|
||||
if success {
|
||||
println!("{}", message.green());
|
||||
} else {
|
||||
println!("{}", message.red());
|
||||
}
|
||||
}
|
||||
Ok(Response::Success(_)) => println!("{}", "Unexpected success response".red()),
|
||||
Ok(Response::Error(e)) => println!("{}", e.red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
},
|
||||
"halt" => {
|
||||
match send_command(Command::Halt).await {
|
||||
Ok(Response::Success(_)) => {
|
||||
println!("{}", "Server shutdown initiated".bright_green());
|
||||
break;
|
||||
},
|
||||
Ok(Response::Error(e)) => println!("{}", e.bold().red()),
|
||||
_ => println!("{}", "Unexpected response type".red()),
|
||||
}
|
||||
},
|
||||
"help" => {
|
||||
println!();
|
||||
println!("{}", "Available commands:".bold());
|
||||
println!(" insert <key> <json_value> (or i) - Insert data");
|
||||
println!(" get <key> [version] (or g) - Get data (optionally specify version)");
|
||||
println!(" update <key> <json_value> (or u) - Update data");
|
||||
println!(" delete <key> (or d) - Delete data");
|
||||
println!(" begin (or transaction, tx) - Start transaction");
|
||||
println!(" commit - Commit current transaction");
|
||||
println!(" rollback - Rollback current transaction");
|
||||
println!(" createindex <field> - Create index");
|
||||
println!(" dropindex <field> - Drop index");
|
||||
println!(" sysexec <script_name> - Execute system script");
|
||||
println!(" futexec - Enter Lua interactive mode");
|
||||
println!(" replication on/off - Enable/disable replication");
|
||||
println!(" replication status - Show replication status");
|
||||
println!(" replication add-peer <addr> - Add replication peer");
|
||||
println!(" replication remove-peer <addr> - Remove replication peer");
|
||||
println!(" futload <path> - Load data from backup file");
|
||||
println!(" futunload <path> - Save data to backup file");
|
||||
println!(" halt - Gracefully shutdown the server");
|
||||
println!(" exit (or quit) - Exit client");
|
||||
println!(" help - Show this help");
|
||||
println!();
|
||||
},
|
||||
"exit" | "quit" => break,
|
||||
_ => println!("{}: Unknown command. Type 'help' for available commands.", "Error".red()),
|
||||
}
|
||||
},
|
||||
Err(ReadlineError::Interrupted) => break,
|
||||
Err(ReadlineError::Eof) => break,
|
||||
Err(err) => {
|
||||
println!("{}: {:?}", "Error".red(), err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rl.save_history("futriix-history.txt")?;
|
||||
Ok(())
|
||||
}
|
17
futriix.config.toml
Normal file
17
futriix.config.toml
Normal file
@ -0,0 +1,17 @@
|
||||
[server]
|
||||
ip = "127.0.0.1" # IP-адрес сервера (0.0.0.0 для доступа из сети)
|
||||
port = 8080 # Порт для TCP-сервера
|
||||
log_path = "futriix.log" # Путь к лог-файлу
|
||||
|
||||
[client]
|
||||
ip = "127.0.0.1" # IP для клиента (обычно localhost)
|
||||
port = 8080 # Порт клиента (должен совпадать с серверным)
|
||||
|
||||
[replication]
|
||||
enabled = false # Включена ли репликация
|
||||
peer_nodes = [] # Список узлов для репликации (например ["192.168.1.2:8080"])
|
||||
sync_interval = 1000 # Интервал синхронизации в мс
|
||||
|
||||
[http_api]
|
||||
enabled = true # Включить HTTP API
|
||||
port = 8081 # Порт для HTTP API (обычно на 1 больше основного)
|
22
futriix/Cargo.toml
Normal file
22
futriix/Cargo.toml
Normal file
@ -0,0 +1,22 @@
|
||||
[package]
|
||||
name = "futriix"
|
||||
version = "0.2.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1.0", features = ["full"] }
|
||||
rmp-serde = "0.15" # MessagePack
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
toml = "0.7"
|
||||
colored = "2.0"
|
||||
log = "0.4"
|
||||
#simplelog = { version = "0.12.2", features = ["termcolor", "std"] }
|
||||
simplelog = { version = "0.12.2", features = ["termcolor"] }
|
||||
crossbeam = "0.8"
|
||||
parking_lot = "0.12"
|
||||
ctrlc = "3.2"
|
||||
warp = "0.3" # Для HTTP API и HTTP-сервера
|
||||
futures = "0.3" # Для работы с асинхронными потоками FutureExt
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
rlua = "0.20.1"
|
77
futriix/src/main.rs
Normal file
77
futriix/src/main.rs
Normal file
@ -0,0 +1,77 @@
|
||||
// futriix/main.rs
|
||||
mod server;
|
||||
|
||||
use server::*;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use toml::Value;
|
||||
use simplelog::*;
|
||||
use chrono::Local;
|
||||
use log::info;
|
||||
use ctrlc;
|
||||
use colored::Colorize;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let config_content = fs::read_to_string("futriix.config.toml")?;
|
||||
let config: Value = toml::from_str(&config_content)?;
|
||||
|
||||
let ip = config["server"]["ip"].as_str().unwrap_or("127.0.0.1");
|
||||
let port = config["server"]["port"].as_integer().unwrap_or(8080) as u16;
|
||||
let log_path = config["server"]["log_path"].as_str().unwrap_or("futriix.log");
|
||||
|
||||
ctrlc::set_handler(move || {
|
||||
println!("{}", "Server is now shutdown".truecolor(0, 191, 255));
|
||||
std::process::exit(0);
|
||||
})?;
|
||||
|
||||
let log_dir = Path::new("logs");
|
||||
if !log_dir.exists() {
|
||||
fs::create_dir(log_dir)?;
|
||||
}
|
||||
|
||||
let timestamp = Local::now().format("%d.%m.%Y %H-%M-%S");
|
||||
let log_filename = format!("futriix_{}.log", timestamp);
|
||||
let full_log_path = log_dir.join(log_filename);
|
||||
|
||||
let symlink_path = Path::new(log_path);
|
||||
if symlink_path.exists() {
|
||||
fs::remove_file(symlink_path)?;
|
||||
}
|
||||
#[cfg(unix)]
|
||||
std::os::unix::fs::symlink(full_log_path.clone(), symlink_path)?;
|
||||
#[cfg(windows)]
|
||||
std::os::windows::fs::symlink_file(full_log_path.clone(), symlink_path)?;
|
||||
|
||||
let log_file = fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&full_log_path)?;
|
||||
|
||||
CombinedLogger::init(vec![
|
||||
TermLogger::new(
|
||||
LevelFilter::Info,
|
||||
Config::default(),
|
||||
TerminalMode::Mixed,
|
||||
ColorChoice::Auto,
|
||||
),
|
||||
WriteLogger::new(
|
||||
LevelFilter::Info,
|
||||
Config::default(),
|
||||
log_file,
|
||||
),
|
||||
])?;
|
||||
|
||||
println!();
|
||||
println!("{}", "Futriix Server started successfully!".truecolor(0, 191, 255));
|
||||
|
||||
let server = FutriixServer::new(&config);
|
||||
let addr = format!("{}:{}", ip, port);
|
||||
info!("Starting server on {}", addr);
|
||||
info!("Log file: {}", full_log_path.display());
|
||||
info!("Symlink: {}", symlink_path.display());
|
||||
|
||||
server.run(&addr).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
657
futriix/src/server.rs
Normal file
657
futriix/src/server.rs
Normal file
@ -0,0 +1,657 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use crossbeam::queue::SegQueue;
|
||||
use parking_lot::RwLock;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::time;
|
||||
use log::{info, error, debug, warn};
|
||||
use warp::Filter;
|
||||
use rlua::Lua;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub enum Command {
|
||||
Insert { key: String, value: serde_json::Value },
|
||||
Get { key: String, version: Option<u64> },
|
||||
Update { key: String, value: serde_json::Value },
|
||||
Delete { key: String },
|
||||
BeginTransaction,
|
||||
CommitTransaction(u64),
|
||||
RollbackTransaction(u64),
|
||||
CreateIndex { field: String },
|
||||
DropIndex { field: String },
|
||||
SysExec { script_name: String },
|
||||
Replicate { commands: Vec<(Command, u128)> },
|
||||
ReplicationEnable,
|
||||
ReplicationDisable,
|
||||
ReplicationAddPeer { addr: String },
|
||||
ReplicationRemovePeer { addr: String },
|
||||
ReplicationStatus,
|
||||
BackupCreate { path: String },
|
||||
BackupRestore { path: String },
|
||||
LuaExec { code: String },
|
||||
Halt,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum Response {
|
||||
Success(Option<serde_json::Value>),
|
||||
Error(String),
|
||||
ReplicationStatus {
|
||||
enabled: bool,
|
||||
peers: Vec<String>,
|
||||
last_sync: u128,
|
||||
},
|
||||
BackupStatus { success: bool, message: String },
|
||||
LuaOutput(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct VersionedValue {
|
||||
value: serde_json::Value,
|
||||
version: u64,
|
||||
timestamp: u128,
|
||||
tx_id: Option<u64>,
|
||||
}
|
||||
|
||||
pub struct FutriixServer {
|
||||
db: Arc<RwLock<HashMap<String, Vec<VersionedValue>>>>,
|
||||
command_queue: Arc<SegQueue<Command>>,
|
||||
transactions: Arc<RwLock<HashMap<u64, HashMap<String, VersionedValue>>>>,
|
||||
indexes: Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||
replication_enabled: bool,
|
||||
peer_nodes: Vec<String>,
|
||||
sync_interval: u64,
|
||||
last_sync_timestamp: Arc<RwLock<u128>>,
|
||||
command_history: Arc<RwLock<Vec<(Command, u128)>>>,
|
||||
next_tx_id: Arc<RwLock<u64>>,
|
||||
shutdown: Arc<RwLock<bool>>,
|
||||
}
|
||||
|
||||
impl FutriixServer {
|
||||
pub fn new(config: &toml::Value) -> Self {
|
||||
debug!("Initializing server with config: {:?}", config);
|
||||
|
||||
let replication_enabled = config.get("replication")
|
||||
.and_then(|r| r.get("enabled"))
|
||||
.and_then(|v| v.as_bool())
|
||||
.unwrap_or(false);
|
||||
|
||||
let peer_nodes = config.get("replication")
|
||||
.and_then(|r| r.get("peer_nodes"))
|
||||
.and_then(|n| n.as_array())
|
||||
.map(|nodes| {
|
||||
nodes.iter()
|
||||
.filter_map(|n| n.as_str().map(|s| s.to_string()))
|
||||
.collect()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
let sync_interval = config.get("replication")
|
||||
.and_then(|r| r.get("sync_interval"))
|
||||
.and_then(|i| i.as_integer())
|
||||
.map(|i| i as u64)
|
||||
.unwrap_or(1000);
|
||||
|
||||
FutriixServer {
|
||||
db: Arc::new(RwLock::new(HashMap::new())),
|
||||
command_queue: Arc::new(SegQueue::new()),
|
||||
transactions: Arc::new(RwLock::new(HashMap::new())),
|
||||
indexes: Arc::new(RwLock::new(HashMap::new())),
|
||||
replication_enabled,
|
||||
peer_nodes,
|
||||
sync_interval,
|
||||
last_sync_timestamp: Arc::new(RwLock::new(0)),
|
||||
command_history: Arc::new(RwLock::new(Vec::new())),
|
||||
next_tx_id: Arc::new(RwLock::new(1)),
|
||||
shutdown: Arc::new(RwLock::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(&self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
info!("Starting server on {}", addr);
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
|
||||
let http_port = addr.split(':').last().unwrap_or("8080").parse::<u16>().unwrap_or(8080) + 1;
|
||||
let http_addr = format!("0.0.0.0:{}", http_port);
|
||||
self.start_http_api(http_addr.clone());
|
||||
|
||||
info!("Futriix Server started successfully!");
|
||||
info!("Listening on: {}", addr);
|
||||
info!("HTTP API available on: {}", http_addr);
|
||||
|
||||
if self.replication_enabled {
|
||||
info!("Replication enabled with peers: {:?}", self.peer_nodes);
|
||||
let sync_task = self.start_replication_sync();
|
||||
tokio::spawn(sync_task);
|
||||
}
|
||||
|
||||
let scripts_dir = Path::new("scripts");
|
||||
if !scripts_dir.exists() {
|
||||
if let Err(e) = std::fs::create_dir(scripts_dir) {
|
||||
warn!("Failed to create scripts directory: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
let backups_dir = Path::new("backups");
|
||||
if !backups_dir.exists() {
|
||||
if let Err(e) = std::fs::create_dir(backups_dir) {
|
||||
warn!("Failed to create backups directory: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
if *self.shutdown.read() {
|
||||
info!("Shutdown signal received, stopping server");
|
||||
break;
|
||||
}
|
||||
|
||||
match listener.accept().await {
|
||||
Ok((socket, _)) => {
|
||||
let server = self.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = server.handle_connection(socket).await {
|
||||
error!("Connection error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Accept error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn start_http_api(&self, addr: String) {
|
||||
let db = self.db.clone();
|
||||
let command_queue_insert = self.command_queue.clone();
|
||||
let command_queue_update = self.command_queue.clone();
|
||||
let command_queue_delete = self.command_queue.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let get = warp::path!("api" / "get" / String)
|
||||
.and(warp::get())
|
||||
.map(move |key: String| {
|
||||
let db = db.read();
|
||||
match db.get(&key) {
|
||||
Some(versions) => {
|
||||
if let Some(latest) = versions.last() {
|
||||
warp::reply::json(&latest.value)
|
||||
} else {
|
||||
warp::reply::json(&serde_json::Value::Null)
|
||||
}
|
||||
},
|
||||
None => warp::reply::json(&serde_json::Value::Null),
|
||||
}
|
||||
});
|
||||
|
||||
let insert = warp::path!("api" / "insert")
|
||||
.and(warp::post())
|
||||
.and(warp::body::json())
|
||||
.map(move |data: HashMap<String, serde_json::Value>| {
|
||||
for (key, value) in data {
|
||||
command_queue_insert.push(Command::Insert { key, value });
|
||||
}
|
||||
warp::reply::json(&serde_json::json!({"status": "ok"}))
|
||||
});
|
||||
|
||||
let update = warp::path!("api" / "update")
|
||||
.and(warp::post())
|
||||
.and(warp::body::json())
|
||||
.map(move |data: HashMap<String, serde_json::Value>| {
|
||||
for (key, value) in data {
|
||||
command_queue_update.push(Command::Update { key, value });
|
||||
}
|
||||
warp::reply::json(&serde_json::json!({"status": "ok"}))
|
||||
});
|
||||
|
||||
let delete = warp::path!("api" / "delete" / String)
|
||||
.and(warp::delete())
|
||||
.map(move |key: String| {
|
||||
command_queue_delete.push(Command::Delete { key });
|
||||
warp::reply::json(&serde_json::json!({"status": "ok"}))
|
||||
});
|
||||
|
||||
let routes = get.or(insert).or(update).or(delete);
|
||||
|
||||
warp::serve(routes)
|
||||
.run(addr.parse::<std::net::SocketAddr>().unwrap())
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
fn start_replication_sync(&self) -> impl std::future::Future<Output = ()> + 'static {
|
||||
let peer_nodes = self.peer_nodes.clone();
|
||||
let command_history = self.command_history.clone();
|
||||
let last_sync_timestamp = self.last_sync_timestamp.clone();
|
||||
let sync_interval = self.sync_interval;
|
||||
|
||||
async move {
|
||||
let mut interval = time::interval(Duration::from_millis(sync_interval));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
let commands_to_sync = {
|
||||
let history = command_history.read();
|
||||
let last_sync = *last_sync_timestamp.read();
|
||||
history.iter()
|
||||
.filter(|(_, ts)| *ts > last_sync)
|
||||
.map(|(cmd, ts)| (cmd.clone(), *ts))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
if !commands_to_sync.is_empty() {
|
||||
for peer in &peer_nodes {
|
||||
if let Ok(mut stream) = TcpStream::connect(peer).await {
|
||||
let replicate_cmd = Command::Replicate {
|
||||
commands: commands_to_sync.clone(),
|
||||
};
|
||||
|
||||
if let Ok(cmd_bytes) = rmp_serde::to_vec(&replicate_cmd) {
|
||||
let len = cmd_bytes.len() as u32;
|
||||
if let Err(e) = stream.write_all(&len.to_be_bytes()).await {
|
||||
warn!("Failed to write command length to peer {}: {}", peer, e);
|
||||
continue;
|
||||
}
|
||||
if let Err(e) = stream.write_all(&cmd_bytes).await {
|
||||
warn!("Failed to write command to peer {}: {}", peer, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_connection(
|
||||
&self,
|
||||
mut socket: TcpStream,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
loop {
|
||||
let mut len_buf = [0u8; 4];
|
||||
if let Err(e) = socket.read_exact(&mut len_buf).await {
|
||||
error!("Failed to read command length: {}", e);
|
||||
break;
|
||||
}
|
||||
let len = u32::from_be_bytes(len_buf) as usize;
|
||||
|
||||
let mut buf = vec![0u8; len];
|
||||
if let Err(e) = socket.read_exact(&mut buf).await {
|
||||
error!("Failed to read command: {}", e);
|
||||
break;
|
||||
}
|
||||
|
||||
let cmd: Command = match rmp_serde::from_slice(&buf) {
|
||||
Ok(cmd) => cmd,
|
||||
Err(e) => {
|
||||
error!("Failed to deserialize command: {}", e);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
self.command_queue.push(cmd.clone());
|
||||
|
||||
while let Some(cmd) = self.command_queue.pop() {
|
||||
if let Command::Replicate { commands } = cmd {
|
||||
for (cmd, ts) in commands {
|
||||
match cmd {
|
||||
Command::Insert { key, value } | Command::Update { key, value } => {
|
||||
let mut db = self.db.write();
|
||||
let timestamp = match SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
{
|
||||
Ok(duration) => duration.as_millis(),
|
||||
Err(e) => {
|
||||
error!("SystemTime error: {}", e);
|
||||
0
|
||||
}
|
||||
};
|
||||
let versioned_value = VersionedValue {
|
||||
value: value.clone(),
|
||||
version: 0,
|
||||
timestamp,
|
||||
tx_id: None,
|
||||
};
|
||||
db.entry(key.clone())
|
||||
.or_default()
|
||||
.push(versioned_value);
|
||||
}
|
||||
_ => {
|
||||
let _ = Self::process_command(
|
||||
&self.db,
|
||||
&self.transactions,
|
||||
&self.indexes,
|
||||
&self.next_tx_id,
|
||||
&self.shutdown,
|
||||
cmd,
|
||||
);
|
||||
}
|
||||
}
|
||||
*self.last_sync_timestamp.write() = ts;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let response = Self::process_command(
|
||||
&self.db,
|
||||
&self.transactions,
|
||||
&self.indexes,
|
||||
&self.next_tx_id,
|
||||
&self.shutdown,
|
||||
cmd.clone(),
|
||||
);
|
||||
|
||||
let response_bytes = match rmp_serde::to_vec(&response) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(e) => {
|
||||
error!("Failed to serialize response: {}", e);
|
||||
break;
|
||||
}
|
||||
};
|
||||
let len = response_bytes.len() as u32;
|
||||
if let Err(e) = socket.write_all(&len.to_be_bytes()).await {
|
||||
error!("Failed to write response length: {}", e);
|
||||
break;
|
||||
}
|
||||
if let Err(e) = socket.write_all(&response_bytes).await {
|
||||
error!("Failed to write response: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_command(
|
||||
db: &Arc<RwLock<HashMap<String, Vec<VersionedValue>>>>,
|
||||
transactions: &Arc<RwLock<HashMap<u64, HashMap<String, VersionedValue>>>>,
|
||||
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||
next_tx_id: &Arc<RwLock<u64>>,
|
||||
shutdown: &Arc<RwLock<bool>>,
|
||||
cmd: Command,
|
||||
) -> Response {
|
||||
match cmd {
|
||||
Command::Insert { key, value } => {
|
||||
let mut db = db.write();
|
||||
let timestamp = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis();
|
||||
let version = db.get(&key).map_or(0, |v| v.last().map_or(0, |vv| vv.version + 1));
|
||||
|
||||
let versioned_value = VersionedValue {
|
||||
value: value.clone(),
|
||||
version,
|
||||
timestamp,
|
||||
tx_id: None,
|
||||
};
|
||||
|
||||
db.entry(key.clone())
|
||||
.or_default()
|
||||
.push(versioned_value.clone());
|
||||
|
||||
Self::update_indexes(indexes, key, &value);
|
||||
Response::Success(None)
|
||||
},
|
||||
Command::Get { key, version } => {
|
||||
let db = db.read();
|
||||
match db.get(&key) {
|
||||
Some(versions) => {
|
||||
let value = if let Some(v) = version {
|
||||
versions.iter().find(|vv| vv.version == v)
|
||||
} else {
|
||||
versions.last()
|
||||
};
|
||||
|
||||
match value {
|
||||
Some(vv) => Response::Success(Some(vv.value.clone())),
|
||||
None => Response::Error("Version not found".to_string()),
|
||||
}
|
||||
},
|
||||
None => Response::Error("Key not found".to_string()),
|
||||
}
|
||||
},
|
||||
Command::Update { key, value } => {
|
||||
let mut db = db.write();
|
||||
if db.contains_key(&key) {
|
||||
let timestamp = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis();
|
||||
let version = db.get(&key).unwrap().last().unwrap().version + 1;
|
||||
|
||||
let versioned_value = VersionedValue {
|
||||
value: value.clone(),
|
||||
version,
|
||||
timestamp,
|
||||
tx_id: None,
|
||||
};
|
||||
|
||||
db.get_mut(&key).unwrap().push(versioned_value.clone());
|
||||
Self::update_indexes(indexes, key, &value);
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("Key not found".to_string())
|
||||
}
|
||||
},
|
||||
Command::Delete { key } => {
|
||||
let mut db = db.write();
|
||||
if db.remove(&key).is_some() {
|
||||
Self::remove_from_indexes(indexes, key);
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("Key not found".to_string())
|
||||
}
|
||||
},
|
||||
Command::BeginTransaction => {
|
||||
let tx_id = {
|
||||
let mut next_id = next_tx_id.write();
|
||||
let id = *next_id;
|
||||
*next_id += 1;
|
||||
id
|
||||
};
|
||||
|
||||
let mut transactions = transactions.write();
|
||||
transactions.insert(tx_id, HashMap::new());
|
||||
Response::Success(Some(tx_id.into()))
|
||||
},
|
||||
Command::CommitTransaction(tx_id) => {
|
||||
let mut db = db.write();
|
||||
let mut transactions = transactions.write();
|
||||
|
||||
if let Some(tx_data) = transactions.remove(&tx_id) {
|
||||
for (key, versioned_value) in tx_data {
|
||||
db.entry(key.clone())
|
||||
.or_default()
|
||||
.push(versioned_value.clone());
|
||||
Self::update_indexes(indexes, key, &versioned_value.value);
|
||||
}
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("Transaction not found".to_string())
|
||||
}
|
||||
},
|
||||
Command::RollbackTransaction(tx_id) => {
|
||||
let mut transactions = transactions.write();
|
||||
if transactions.remove(&tx_id).is_some() {
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("Transaction not found".to_string())
|
||||
}
|
||||
},
|
||||
Command::CreateIndex { field } => {
|
||||
let mut indexes = indexes.write();
|
||||
if !indexes.contains_key(&field) {
|
||||
indexes.insert(field, HashMap::new());
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("Index already exists".to_string())
|
||||
}
|
||||
},
|
||||
Command::DropIndex { field } => {
|
||||
let mut indexes = indexes.write();
|
||||
if indexes.remove(&field).is_some() {
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("Index not found".to_string())
|
||||
}
|
||||
},
|
||||
Command::SysExec { script_name } => {
|
||||
match Self::execute_lua_script(&script_name) {
|
||||
Ok(output) => Response::Success(Some(output.into())),
|
||||
Err(e) => Response::Error(format!("Script execution failed: {}", e)),
|
||||
}
|
||||
},
|
||||
Command::LuaExec { code } => {
|
||||
match Self::execute_lua_code(&code) {
|
||||
Ok(output) => Response::LuaOutput(output),
|
||||
Err(e) => Response::Error(format!("Lua execution failed: {}", e)),
|
||||
}
|
||||
},
|
||||
Command::ReplicationEnable => Response::Success(None),
|
||||
Command::ReplicationDisable => Response::Success(None),
|
||||
Command::ReplicationAddPeer { addr: _ } => Response::Success(None),
|
||||
Command::ReplicationRemovePeer { addr: _ } => Response::Success(None),
|
||||
Command::ReplicationStatus => Response::Success(None),
|
||||
Command::Replicate { .. } => Response::Success(None),
|
||||
Command::BackupCreate { path } => {
|
||||
match Self::create_backup(db, &path) {
|
||||
Ok(_) => Response::BackupStatus {
|
||||
success: true,
|
||||
message: format!("Backup created successfully at {}", path),
|
||||
},
|
||||
Err(e) => Response::BackupStatus {
|
||||
success: false,
|
||||
message: format!("Backup failed: {}", e),
|
||||
},
|
||||
}
|
||||
},
|
||||
Command::BackupRestore { path } => {
|
||||
match Self::restore_backup(db, indexes, &path) {
|
||||
Ok(_) => Response::BackupStatus {
|
||||
success: true,
|
||||
message: format!("Backup restored successfully from {}", path),
|
||||
},
|
||||
Err(e) => Response::BackupStatus {
|
||||
success: false,
|
||||
message: format!("Restore failed: {}", e),
|
||||
},
|
||||
}
|
||||
},
|
||||
Command::Halt => {
|
||||
*shutdown.write() = true;
|
||||
Response::Success(None)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn execute_lua_code(code: &str) -> Result<String, Box<dyn std::error::Error>> {
|
||||
let lua = Lua::new();
|
||||
let result = lua.load(code).eval::<rlua::String>()?;
|
||||
Ok(result.to_str()?.to_string())
|
||||
}
|
||||
|
||||
fn create_backup(
|
||||
db: &Arc<RwLock<HashMap<String, Vec<VersionedValue>>>>,
|
||||
path: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let db = db.read();
|
||||
let serialized = serde_json::to_string(&*db)?;
|
||||
std::fs::write(path, serialized)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn restore_backup(
|
||||
db: &Arc<RwLock<HashMap<String, Vec<VersionedValue>>>>,
|
||||
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||
path: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let data = std::fs::read_to_string(path)?;
|
||||
let restored: HashMap<String, Vec<VersionedValue>> = serde_json::from_str(&data)?;
|
||||
|
||||
let mut db = db.write();
|
||||
*db = restored;
|
||||
|
||||
let mut indexes = indexes.write();
|
||||
indexes.clear();
|
||||
|
||||
for (key, versions) in db.iter() {
|
||||
if let Some(latest) = versions.last() {
|
||||
for (field, index) in indexes.iter_mut() {
|
||||
if let Some(field_value) = latest.value.get(field) {
|
||||
index.entry(field_value.clone())
|
||||
.or_default()
|
||||
.push(key.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update_indexes(
|
||||
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||
key: String,
|
||||
value: &serde_json::Value,
|
||||
) {
|
||||
let mut indexes = indexes.write();
|
||||
for (field, index) in indexes.iter_mut() {
|
||||
if let Some(field_value) = value.get(field) {
|
||||
index.entry(field_value.clone())
|
||||
.or_default()
|
||||
.push(key.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_from_indexes(
|
||||
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||
key: String,
|
||||
) {
|
||||
let mut indexes = indexes.write();
|
||||
for index in indexes.values_mut() {
|
||||
for entries in index.values_mut() {
|
||||
if let Some(pos) = entries.iter().position(|k| k == &key) {
|
||||
entries.remove(pos);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn execute_lua_script(script_name: &str) -> Result<String, Box<dyn std::error::Error>> {
|
||||
let script_path = Path::new("scripts").join(script_name).with_extension("lua");
|
||||
let output = std::process::Command::new("lua")
|
||||
.arg(&script_path)
|
||||
.output()?;
|
||||
|
||||
if output.status.success() {
|
||||
Ok(String::from_utf8(output.stdout)?)
|
||||
} else {
|
||||
Err(String::from_utf8(output.stderr)?.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for FutriixServer {
|
||||
fn clone(&self) -> Self {
|
||||
FutriixServer {
|
||||
db: self.db.clone(),
|
||||
command_queue: self.command_queue.clone(),
|
||||
transactions: self.transactions.clone(),
|
||||
indexes: self.indexes.clone(),
|
||||
replication_enabled: self.replication_enabled,
|
||||
peer_nodes: self.peer_nodes.clone(),
|
||||
sync_interval: self.sync_interval,
|
||||
last_sync_timestamp: self.last_sync_timestamp.clone(),
|
||||
command_history: self.command_history.clone(),
|
||||
next_tx_id: self.next_tx_id.clone(),
|
||||
shutdown: self.shutdown.clone(),
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user