Upload files to "futriix-server/src"
This commit is contained in:
parent
d049d4a8dd
commit
e0649e0b5d
86
futriix-server/src/main.rs
Normal file
86
futriix-server/src/main.rs
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
mod server;
|
||||||
|
|
||||||
|
use server::*;
|
||||||
|
use std::fs;
|
||||||
|
use std::path::Path;
|
||||||
|
use toml::Value;
|
||||||
|
use simplelog::*;
|
||||||
|
use chrono::Local;
|
||||||
|
use log::info;
|
||||||
|
use ctrlc;
|
||||||
|
use colored::Colorize;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
// Чтение конфигурации
|
||||||
|
let config_content = fs::read_to_string("futriix.config.toml")?;
|
||||||
|
let config: Value = toml::from_str(&config_content)?;
|
||||||
|
|
||||||
|
let ip = config["server"]["ip"].as_str().unwrap_or("127.0.0.1");
|
||||||
|
let port = config["server"]["port"].as_integer().unwrap_or(8080) as u16;
|
||||||
|
let log_path = config["server"]["log_path"].as_str().unwrap_or("futriix.log");
|
||||||
|
|
||||||
|
// Настройка обработчика Ctrl+C
|
||||||
|
ctrlc::set_handler(move || {
|
||||||
|
println!("{}", "Server is now shutdown".truecolor(0, 191, 255));
|
||||||
|
std::process::exit(0);
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Создаем директорию для логов, если ее нет
|
||||||
|
let log_dir = Path::new("logs");
|
||||||
|
if !log_dir.exists() {
|
||||||
|
fs::create_dir(log_dir)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Генерируем имя файла лога с датой и временем
|
||||||
|
let timestamp = Local::now().format("%Y-%m-%d_%H-%M-%S");
|
||||||
|
let log_filename = format!("futriix_{}.log", timestamp);
|
||||||
|
let full_log_path = log_dir.join(log_filename);
|
||||||
|
|
||||||
|
// Создаем симлинк на текущий лог-файл
|
||||||
|
let symlink_path = Path::new(log_path);
|
||||||
|
if symlink_path.exists() {
|
||||||
|
fs::remove_file(symlink_path)?;
|
||||||
|
}
|
||||||
|
#[cfg(unix)]
|
||||||
|
std::os::unix::fs::symlink(full_log_path.clone(), symlink_path)?;
|
||||||
|
#[cfg(windows)]
|
||||||
|
std::os::windows::fs::symlink_file(full_log_path.clone(), symlink_path)?;
|
||||||
|
|
||||||
|
// Инициализация логгера с записью в файл и консоль
|
||||||
|
let log_file = fs::OpenOptions::new()
|
||||||
|
.create(true)
|
||||||
|
.append(true)
|
||||||
|
.open(&full_log_path)?;
|
||||||
|
|
||||||
|
CombinedLogger::init(vec![
|
||||||
|
TermLogger::new(
|
||||||
|
LevelFilter::Info,
|
||||||
|
Config::default(),
|
||||||
|
TerminalMode::Mixed,
|
||||||
|
ColorChoice::Auto,
|
||||||
|
),
|
||||||
|
WriteLogger::new(
|
||||||
|
LevelFilter::Info,
|
||||||
|
Config::default(),
|
||||||
|
log_file,
|
||||||
|
),
|
||||||
|
])?;
|
||||||
|
|
||||||
|
// Пустая строка перед запуском (только в консоль)
|
||||||
|
println!();
|
||||||
|
|
||||||
|
// Первое информационное сообщение
|
||||||
|
println!("{}", "Futriix Server started successfully!".truecolor(0, 191, 255));
|
||||||
|
|
||||||
|
// Запуск сервера
|
||||||
|
let server = FutriixServer::new(&config);
|
||||||
|
let addr = format!("{}:{}", ip, port);
|
||||||
|
info!("Starting server on {}", addr);
|
||||||
|
info!("Log file: {}", full_log_path.display());
|
||||||
|
info!("Symlink: {}", symlink_path.display());
|
||||||
|
|
||||||
|
server.run(&addr).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
761
futriix-server/src/server.rs
Normal file
761
futriix-server/src/server.rs
Normal file
@ -0,0 +1,761 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||||
|
use crossbeam::queue::SegQueue;
|
||||||
|
use parking_lot::RwLock;
|
||||||
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use tokio::time;
|
||||||
|
use log::{info, error, debug, warn};
|
||||||
|
use warp::Filter;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct BStarTreeNode {
|
||||||
|
keys: Vec<u128>,
|
||||||
|
values: Vec<serde_json::Value>,
|
||||||
|
children: Vec<Arc<RwLock<BStarTreeNode>>>,
|
||||||
|
is_leaf: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BStarTreeNode {
|
||||||
|
fn new(is_leaf: bool) -> Self {
|
||||||
|
BStarTreeNode {
|
||||||
|
keys: Vec::new(),
|
||||||
|
values: Vec::new(),
|
||||||
|
children: Vec::new(),
|
||||||
|
is_leaf,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn search(&self, timestamp: u128) -> Option<serde_json::Value> {
|
||||||
|
let mut i = 0;
|
||||||
|
while i < self.keys.len() && timestamp > self.keys[i] {
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if i < self.keys.len() && timestamp == self.keys[i] {
|
||||||
|
return Some(self.values[i].clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.is_leaf {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
self.children[i].read().search(timestamp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&mut self, timestamp: u128, value: serde_json::Value) {
|
||||||
|
let mut i = 0;
|
||||||
|
while i < self.keys.len() && timestamp > self.keys[i] {
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.is_leaf {
|
||||||
|
self.keys.insert(i, timestamp);
|
||||||
|
self.values.insert(i, value);
|
||||||
|
} else {
|
||||||
|
let child = self.children[i].clone();
|
||||||
|
let mut child_guard = child.write();
|
||||||
|
|
||||||
|
if child_guard.keys.len() == 2 * 3 - 1 {
|
||||||
|
self.split_child(i, &mut child_guard);
|
||||||
|
if timestamp > self.keys[i] {
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.children[i].write().insert(timestamp, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn split_child(&mut self, i: usize, child: &mut BStarTreeNode) {
|
||||||
|
let mut new_node = BStarTreeNode::new(child.is_leaf);
|
||||||
|
new_node.keys = child.keys.split_off(3 / 2);
|
||||||
|
new_node.values = child.values.split_off(3 / 2);
|
||||||
|
|
||||||
|
if !child.is_leaf {
|
||||||
|
new_node.children = child.children.split_off(3 / 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
let middle_key = child.keys.pop().unwrap();
|
||||||
|
let middle_value = child.values.pop().unwrap();
|
||||||
|
|
||||||
|
self.keys.insert(i, middle_key);
|
||||||
|
self.values.insert(i, middle_value);
|
||||||
|
self.children.insert(i + 1, Arc::new(RwLock::new(new_node)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub enum Command {
|
||||||
|
Insert { key: String, value: serde_json::Value },
|
||||||
|
Get { key: String, version: Option<u64> },
|
||||||
|
Update { key: String, value: serde_json::Value },
|
||||||
|
Delete { key: String },
|
||||||
|
BeginTransaction,
|
||||||
|
CommitTransaction(u64),
|
||||||
|
RollbackTransaction(u64),
|
||||||
|
CreateIndex { field: String },
|
||||||
|
DropIndex { field: String },
|
||||||
|
SysExec { script_name: String },
|
||||||
|
Replicate { commands: Vec<(Command, u128)> },
|
||||||
|
ReplicationEnable,
|
||||||
|
ReplicationDisable,
|
||||||
|
ReplicationAddPeer { addr: String },
|
||||||
|
ReplicationRemovePeer { addr: String },
|
||||||
|
ReplicationStatus,
|
||||||
|
BackupCreate { path: String },
|
||||||
|
BackupRestore { path: String },
|
||||||
|
TimeSeriesInsert { key: String, value: serde_json::Value, timestamp: Option<u128> },
|
||||||
|
TimeSeriesQuery { key: String, start: u128, end: u128 },
|
||||||
|
FutExec { code: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub enum Response {
|
||||||
|
Success(Option<serde_json::Value>),
|
||||||
|
Error(String),
|
||||||
|
ReplicationStatus {
|
||||||
|
enabled: bool,
|
||||||
|
peers: Vec<String>,
|
||||||
|
last_sync: u128,
|
||||||
|
},
|
||||||
|
BackupStatus { success: bool, message: String },
|
||||||
|
TimeSeriesData(Vec<(u128, serde_json::Value)>),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
struct VersionedValue {
|
||||||
|
value: serde_json::Value,
|
||||||
|
version: u64,
|
||||||
|
timestamp: u128,
|
||||||
|
tx_id: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct FutriixServer {
|
||||||
|
db: Arc<RwLock<HashMap<String, Vec<VersionedValue>>>>,
|
||||||
|
time_series_db: Arc<RwLock<HashMap<String, Arc<RwLock<BStarTreeNode>>>>>,
|
||||||
|
command_queue: Arc<SegQueue<Command>>,
|
||||||
|
transactions: Arc<RwLock<HashMap<u64, HashMap<String, VersionedValue>>>>,
|
||||||
|
indexes: Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||||
|
replication_enabled: bool,
|
||||||
|
peer_nodes: Vec<String>,
|
||||||
|
sync_interval: u64,
|
||||||
|
last_sync_timestamp: Arc<RwLock<u128>>,
|
||||||
|
command_history: Arc<RwLock<Vec<(Command, u128)>>>,
|
||||||
|
next_tx_id: Arc<RwLock<u64>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FutriixServer {
|
||||||
|
pub fn new(config: &toml::Value) -> Self {
|
||||||
|
debug!("Initializing server with config: {:?}", config);
|
||||||
|
|
||||||
|
let replication_enabled = config.get("replication")
|
||||||
|
.and_then(|r| r.get("enabled"))
|
||||||
|
.and_then(|v| v.as_bool())
|
||||||
|
.unwrap_or(false);
|
||||||
|
|
||||||
|
let peer_nodes = config.get("replication")
|
||||||
|
.and_then(|r| r.get("peer_nodes"))
|
||||||
|
.and_then(|n| n.as_array())
|
||||||
|
.map(|nodes| {
|
||||||
|
nodes.iter()
|
||||||
|
.filter_map(|n| n.as_str().map(|s| s.to_string()))
|
||||||
|
.collect()
|
||||||
|
})
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let sync_interval = config.get("replication")
|
||||||
|
.and_then(|r| r.get("sync_interval"))
|
||||||
|
.and_then(|i| i.as_integer())
|
||||||
|
.map(|i| i as u64)
|
||||||
|
.unwrap_or(1000);
|
||||||
|
|
||||||
|
FutriixServer {
|
||||||
|
db: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
time_series_db: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
command_queue: Arc::new(SegQueue::new()),
|
||||||
|
transactions: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
indexes: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
replication_enabled,
|
||||||
|
peer_nodes,
|
||||||
|
sync_interval,
|
||||||
|
last_sync_timestamp: Arc::new(RwLock::new(0)),
|
||||||
|
command_history: Arc::new(RwLock::new(Vec::new())),
|
||||||
|
next_tx_id: Arc::new(RwLock::new(1)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(&self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
info!("Starting server on {}", addr);
|
||||||
|
let listener = TcpListener::bind(addr).await?;
|
||||||
|
|
||||||
|
let http_port = addr.split(':').last().unwrap_or("8080").parse::<u16>().unwrap_or(8080) + 1;
|
||||||
|
let http_addr = format!("0.0.0.0:{}", http_port);
|
||||||
|
self.start_http_api(http_addr.clone());
|
||||||
|
|
||||||
|
info!("Futriix Server started successfully!");
|
||||||
|
info!("Listening on: {}", addr);
|
||||||
|
info!("HTTP API available on: {}", http_addr);
|
||||||
|
|
||||||
|
if self.replication_enabled {
|
||||||
|
info!("Replication enabled with peers: {:?}", self.peer_nodes);
|
||||||
|
let sync_task = self.start_replication_sync();
|
||||||
|
tokio::spawn(sync_task);
|
||||||
|
}
|
||||||
|
|
||||||
|
let scripts_dir = Path::new("scripts");
|
||||||
|
if !scripts_dir.exists() {
|
||||||
|
if let Err(e) = std::fs::create_dir(scripts_dir) {
|
||||||
|
warn!("Failed to create scripts directory: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let backups_dir = Path::new("backups");
|
||||||
|
if !backups_dir.exists() {
|
||||||
|
if let Err(e) = std::fs::create_dir(backups_dir) {
|
||||||
|
warn!("Failed to create backups directory: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match listener.accept().await {
|
||||||
|
Ok((socket, _)) => {
|
||||||
|
let server = self.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = server.handle_connection(socket).await {
|
||||||
|
error!("Connection error: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Accept error: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_http_api(&self, addr: String) {
|
||||||
|
let db = self.db.clone();
|
||||||
|
let command_queue_insert = self.command_queue.clone();
|
||||||
|
let command_queue_update = self.command_queue.clone();
|
||||||
|
let command_queue_delete = self.command_queue.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let get = warp::path!("api" / "get" / String)
|
||||||
|
.and(warp::get())
|
||||||
|
.map(move |key: String| {
|
||||||
|
let db = db.read();
|
||||||
|
match db.get(&key) {
|
||||||
|
Some(versions) => {
|
||||||
|
if let Some(latest) = versions.last() {
|
||||||
|
warp::reply::json(&latest.value)
|
||||||
|
} else {
|
||||||
|
warp::reply::json(&serde_json::Value::Null)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => warp::reply::json(&serde_json::Value::Null),
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let insert = warp::path!("api" / "insert")
|
||||||
|
.and(warp::post())
|
||||||
|
.and(warp::body::json())
|
||||||
|
.map(move |data: HashMap<String, serde_json::Value>| {
|
||||||
|
for (key, value) in data {
|
||||||
|
command_queue_insert.push(Command::Insert { key, value });
|
||||||
|
}
|
||||||
|
warp::reply::json(&serde_json::json!({"status": "ok"}))
|
||||||
|
});
|
||||||
|
|
||||||
|
let update = warp::path!("api" / "update")
|
||||||
|
.and(warp::post())
|
||||||
|
.and(warp::body::json())
|
||||||
|
.map(move |data: HashMap<String, serde_json::Value>| {
|
||||||
|
for (key, value) in data {
|
||||||
|
command_queue_update.push(Command::Update { key, value });
|
||||||
|
}
|
||||||
|
warp::reply::json(&serde_json::json!({"status": "ok"}))
|
||||||
|
});
|
||||||
|
|
||||||
|
let delete = warp::path!("api" / "delete" / String)
|
||||||
|
.and(warp::delete())
|
||||||
|
.map(move |key: String| {
|
||||||
|
command_queue_delete.push(Command::Delete { key });
|
||||||
|
warp::reply::json(&serde_json::json!({"status": "ok"}))
|
||||||
|
});
|
||||||
|
|
||||||
|
let routes = get.or(insert).or(update).or(delete);
|
||||||
|
|
||||||
|
warp::serve(routes)
|
||||||
|
.run(addr.parse::<std::net::SocketAddr>().unwrap())
|
||||||
|
.await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_replication_sync(&self) -> impl std::future::Future<Output = ()> + 'static {
|
||||||
|
let peer_nodes = self.peer_nodes.clone();
|
||||||
|
let command_history = self.command_history.clone();
|
||||||
|
let last_sync_timestamp = self.last_sync_timestamp.clone();
|
||||||
|
let sync_interval = self.sync_interval;
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let mut interval = time::interval(Duration::from_millis(sync_interval));
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
|
||||||
|
let commands_to_sync = {
|
||||||
|
let history = command_history.read();
|
||||||
|
let last_sync = *last_sync_timestamp.read();
|
||||||
|
history.iter()
|
||||||
|
.filter(|(_, ts)| *ts > last_sync)
|
||||||
|
.map(|(cmd, ts)| (cmd.clone(), *ts))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
};
|
||||||
|
|
||||||
|
if !commands_to_sync.is_empty() {
|
||||||
|
for peer in &peer_nodes {
|
||||||
|
if let Ok(mut stream) = TcpStream::connect(peer).await {
|
||||||
|
let replicate_cmd = Command::Replicate {
|
||||||
|
commands: commands_to_sync.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Ok(cmd_bytes) = rmp_serde::to_vec(&replicate_cmd) {
|
||||||
|
let len = cmd_bytes.len() as u32;
|
||||||
|
if let Err(e) = stream.write_all(&len.to_be_bytes()).await {
|
||||||
|
warn!("Failed to write command length to peer {}: {}", peer, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if let Err(e) = stream.write_all(&cmd_bytes).await {
|
||||||
|
warn!("Failed to write command to peer {}: {}", peer, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_connection(
|
||||||
|
&self,
|
||||||
|
mut socket: TcpStream,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
loop {
|
||||||
|
let mut len_buf = [0u8; 4];
|
||||||
|
if let Err(e) = socket.read_exact(&mut len_buf).await {
|
||||||
|
error!("Failed to read command length: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let len = u32::from_be_bytes(len_buf) as usize;
|
||||||
|
|
||||||
|
let mut buf = vec![0u8; len];
|
||||||
|
if let Err(e) = socket.read_exact(&mut buf).await {
|
||||||
|
error!("Failed to read command: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let cmd: Command = match rmp_serde::from_slice(&buf) {
|
||||||
|
Ok(cmd) => cmd,
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to deserialize command: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
self.command_queue.push(cmd.clone());
|
||||||
|
|
||||||
|
while let Some(cmd) = self.command_queue.pop() {
|
||||||
|
if let Command::Replicate { commands } = cmd {
|
||||||
|
for (cmd, ts) in commands {
|
||||||
|
match cmd {
|
||||||
|
Command::Insert { key, value } | Command::Update { key, value } => {
|
||||||
|
let mut db = self.db.write();
|
||||||
|
let timestamp = match SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
{
|
||||||
|
Ok(duration) => duration.as_millis(),
|
||||||
|
Err(e) => {
|
||||||
|
error!("SystemTime error: {}", e);
|
||||||
|
0
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let versioned_value = VersionedValue {
|
||||||
|
value: value.clone(),
|
||||||
|
version: 0,
|
||||||
|
timestamp,
|
||||||
|
tx_id: None,
|
||||||
|
};
|
||||||
|
db.entry(key.clone())
|
||||||
|
.or_default()
|
||||||
|
.push(versioned_value);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
let response = self.process_command_internal(cmd);
|
||||||
|
if let Err(e) = Self::send_response(&mut socket, &response).await {
|
||||||
|
error!("Failed to send response: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*self.last_sync_timestamp.write() = ts;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let response = self.process_command_internal(cmd);
|
||||||
|
if let Err(e) = Self::send_response(&mut socket, &response).await {
|
||||||
|
error!("Failed to send response: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_response(socket: &mut TcpStream, response: &Response) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let response_bytes = rmp_serde::to_vec(response)?;
|
||||||
|
let len = response_bytes.len() as u32;
|
||||||
|
socket.write_all(&len.to_be_bytes()).await?;
|
||||||
|
socket.write_all(&response_bytes).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process_command_internal(&self, cmd: Command) -> Response {
|
||||||
|
match cmd {
|
||||||
|
Command::Insert { key, value } => {
|
||||||
|
let mut db = self.db.write();
|
||||||
|
let timestamp = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_millis();
|
||||||
|
let version = db.get(&key).map_or(0, |v| v.last().map_or(0, |vv| vv.version + 1));
|
||||||
|
|
||||||
|
let versioned_value = VersionedValue {
|
||||||
|
value: value.clone(),
|
||||||
|
version,
|
||||||
|
timestamp,
|
||||||
|
tx_id: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
db.entry(key.clone())
|
||||||
|
.or_default()
|
||||||
|
.push(versioned_value.clone());
|
||||||
|
|
||||||
|
Self::update_indexes(&self.indexes, key, &value);
|
||||||
|
Response::Success(None)
|
||||||
|
},
|
||||||
|
Command::Get { key, version } => {
|
||||||
|
let db = self.db.read();
|
||||||
|
match db.get(&key) {
|
||||||
|
Some(versions) => {
|
||||||
|
let value = if let Some(v) = version {
|
||||||
|
versions.iter().find(|vv| vv.version == v)
|
||||||
|
} else {
|
||||||
|
versions.last()
|
||||||
|
};
|
||||||
|
|
||||||
|
match value {
|
||||||
|
Some(vv) => Response::Success(Some(vv.value.clone())),
|
||||||
|
None => Response::Error("Version not found".to_string()),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => Response::Error("Key not found".to_string()),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::Update { key, value } => {
|
||||||
|
let mut db = self.db.write();
|
||||||
|
if db.contains_key(&key) {
|
||||||
|
let timestamp = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_millis();
|
||||||
|
let version = db.get(&key).unwrap().last().unwrap().version + 1;
|
||||||
|
|
||||||
|
let versioned_value = VersionedValue {
|
||||||
|
value: value.clone(),
|
||||||
|
version,
|
||||||
|
timestamp,
|
||||||
|
tx_id: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
db.get_mut(&key).unwrap().push(versioned_value.clone());
|
||||||
|
Self::update_indexes(&self.indexes, key, &value);
|
||||||
|
Response::Success(None)
|
||||||
|
} else {
|
||||||
|
Response::Error("Key not found".to_string())
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::Delete { key } => {
|
||||||
|
let mut db = self.db.write();
|
||||||
|
if db.remove(&key).is_some() {
|
||||||
|
Self::remove_from_indexes(&self.indexes, key);
|
||||||
|
Response::Success(None)
|
||||||
|
} else {
|
||||||
|
Response::Error("Key not found".to_string())
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::BeginTransaction => {
|
||||||
|
let tx_id = {
|
||||||
|
let mut next_id = self.next_tx_id.write();
|
||||||
|
let id = *next_id;
|
||||||
|
*next_id += 1;
|
||||||
|
id
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut transactions = self.transactions.write();
|
||||||
|
transactions.insert(tx_id, HashMap::new());
|
||||||
|
Response::Success(Some(tx_id.into()))
|
||||||
|
},
|
||||||
|
Command::CommitTransaction(tx_id) => {
|
||||||
|
let mut db = self.db.write();
|
||||||
|
let mut transactions = self.transactions.write();
|
||||||
|
|
||||||
|
if let Some(tx_data) = transactions.remove(&tx_id) {
|
||||||
|
for (key, versioned_value) in tx_data {
|
||||||
|
db.entry(key.clone())
|
||||||
|
.or_default()
|
||||||
|
.push(versioned_value.clone());
|
||||||
|
Self::update_indexes(&self.indexes, key, &versioned_value.value);
|
||||||
|
}
|
||||||
|
Response::Success(None)
|
||||||
|
} else {
|
||||||
|
Response::Error("Transaction not found".to_string())
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::RollbackTransaction(tx_id) => {
|
||||||
|
let mut transactions = self.transactions.write();
|
||||||
|
if transactions.remove(&tx_id).is_some() {
|
||||||
|
Response::Success(None)
|
||||||
|
} else {
|
||||||
|
Response::Error("Transaction not found".to_string())
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::CreateIndex { field } => {
|
||||||
|
let mut indexes = self.indexes.write();
|
||||||
|
if !indexes.contains_key(&field) {
|
||||||
|
indexes.insert(field, HashMap::new());
|
||||||
|
Response::Success(None)
|
||||||
|
} else {
|
||||||
|
Response::Error("Index already exists".to_string())
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::DropIndex { field } => {
|
||||||
|
let mut indexes = self.indexes.write();
|
||||||
|
if indexes.remove(&field).is_some() {
|
||||||
|
Response::Success(None)
|
||||||
|
} else {
|
||||||
|
Response::Error("Index not found".to_string())
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::SysExec { script_name } => {
|
||||||
|
match Self::execute_lua_script(&script_name) {
|
||||||
|
Ok(output) => Response::Success(Some(output.into())),
|
||||||
|
Err(e) => Response::Error(format!("Script execution failed: {}", e)),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::TimeSeriesInsert { key, value, timestamp } => {
|
||||||
|
let timestamp = timestamp.unwrap_or_else(|| {
|
||||||
|
SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_millis()
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut ts_db = self.time_series_db.write();
|
||||||
|
let root = ts_db.entry(key.clone())
|
||||||
|
.or_insert_with(|| Arc::new(RwLock::new(BStarTreeNode::new(true))));
|
||||||
|
|
||||||
|
root.write().insert(timestamp, value);
|
||||||
|
Response::Success(None)
|
||||||
|
},
|
||||||
|
Command::TimeSeriesQuery { key, start, end } => {
|
||||||
|
let ts_db = self.time_series_db.read();
|
||||||
|
if let Some(root) = ts_db.get(&key) {
|
||||||
|
let mut result = Vec::new();
|
||||||
|
let root = root.read();
|
||||||
|
Self::query_range(&root, start, end, &mut result);
|
||||||
|
Response::TimeSeriesData(result)
|
||||||
|
} else {
|
||||||
|
Response::Error("Time series key not found".to_string())
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::FutExec { code: _ } => {
|
||||||
|
Response::Error("Lua execution is not supported in this version".to_string())
|
||||||
|
},
|
||||||
|
Command::ReplicationEnable => {
|
||||||
|
Response::Error("Replication enable/disable must be configured in config file".to_string())
|
||||||
|
},
|
||||||
|
Command::ReplicationDisable => {
|
||||||
|
Response::Error("Replication enable/disable must be configured in config file".to_string())
|
||||||
|
},
|
||||||
|
Command::ReplicationAddPeer { addr } => {
|
||||||
|
let mut peers = self.peer_nodes.clone();
|
||||||
|
if !peers.contains(&addr) {
|
||||||
|
peers.push(addr);
|
||||||
|
Response::Success(None)
|
||||||
|
} else {
|
||||||
|
Response::Error("Peer already exists".to_string())
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::ReplicationRemovePeer { addr } => {
|
||||||
|
let mut peers = self.peer_nodes.clone();
|
||||||
|
if let Some(pos) = peers.iter().position(|p| p == &addr) {
|
||||||
|
peers.remove(pos);
|
||||||
|
Response::Success(None)
|
||||||
|
} else {
|
||||||
|
Response::Error("Peer not found".to_string())
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::ReplicationStatus => Response::ReplicationStatus {
|
||||||
|
enabled: self.replication_enabled,
|
||||||
|
peers: self.peer_nodes.clone(),
|
||||||
|
last_sync: *self.last_sync_timestamp.read(),
|
||||||
|
},
|
||||||
|
Command::Replicate { .. } => Response::Success(None),
|
||||||
|
Command::BackupCreate { path } => {
|
||||||
|
match Self::create_backup(&self.db, &path) {
|
||||||
|
Ok(_) => Response::BackupStatus {
|
||||||
|
success: true,
|
||||||
|
message: format!("Backup created successfully at {}", path),
|
||||||
|
},
|
||||||
|
Err(e) => Response::BackupStatus {
|
||||||
|
success: false,
|
||||||
|
message: format!("Backup failed: {}", e),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::BackupRestore { path } => {
|
||||||
|
match Self::restore_backup(&self.db, &self.indexes, &path) {
|
||||||
|
Ok(_) => Response::BackupStatus {
|
||||||
|
success: true,
|
||||||
|
message: format!("Backup restored successfully from {}", path),
|
||||||
|
},
|
||||||
|
Err(e) => Response::BackupStatus {
|
||||||
|
success: false,
|
||||||
|
message: format!("Restore failed: {}", e),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn query_range(node: &BStarTreeNode, start: u128, end: u128, result: &mut Vec<(u128, serde_json::Value)>) {
|
||||||
|
let mut i = 0;
|
||||||
|
while i < node.keys.len() && node.keys[i] < start {
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
while i < node.keys.len() && node.keys[i] <= end {
|
||||||
|
result.push((node.keys[i], node.values[i].clone()));
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !node.is_leaf {
|
||||||
|
for j in 0..node.children.len() {
|
||||||
|
Self::query_range(&node.children[j].read(), start, end, result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_backup(
|
||||||
|
db: &Arc<RwLock<HashMap<String, Vec<VersionedValue>>>>,
|
||||||
|
path: &str,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let db = db.read();
|
||||||
|
let serialized = serde_json::to_string(&*db)?;
|
||||||
|
std::fs::write(path, serialized)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn restore_backup(
|
||||||
|
db: &Arc<RwLock<HashMap<String, Vec<VersionedValue>>>>,
|
||||||
|
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||||
|
path: &str,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let data = std::fs::read_to_string(path)?;
|
||||||
|
let restored: HashMap<String, Vec<VersionedValue>> = serde_json::from_str(&data)?;
|
||||||
|
|
||||||
|
let mut db = db.write();
|
||||||
|
*db = restored;
|
||||||
|
|
||||||
|
let mut indexes = indexes.write();
|
||||||
|
indexes.clear();
|
||||||
|
|
||||||
|
for (key, versions) in db.iter() {
|
||||||
|
if let Some(latest) = versions.last() {
|
||||||
|
for (field, index) in indexes.iter_mut() {
|
||||||
|
if let Some(field_value) = latest.value.get(field) {
|
||||||
|
index.entry(field_value.clone())
|
||||||
|
.or_default()
|
||||||
|
.push(key.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_indexes(
|
||||||
|
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||||
|
key: String,
|
||||||
|
value: &serde_json::Value,
|
||||||
|
) {
|
||||||
|
let mut indexes = indexes.write();
|
||||||
|
for (field, index) in indexes.iter_mut() {
|
||||||
|
if let Some(field_value) = value.get(field) {
|
||||||
|
index.entry(field_value.clone())
|
||||||
|
.or_default()
|
||||||
|
.push(key.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_from_indexes(
|
||||||
|
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||||
|
key: String,
|
||||||
|
) {
|
||||||
|
let mut indexes = indexes.write();
|
||||||
|
for index in indexes.values_mut() {
|
||||||
|
for entries in index.values_mut() {
|
||||||
|
if let Some(pos) = entries.iter().position(|k| k == &key) {
|
||||||
|
entries.remove(pos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn execute_lua_script(script_name: &str) -> Result<String, Box<dyn std::error::Error>> {
|
||||||
|
let script_path = Path::new("scripts").join(script_name).with_extension("lua");
|
||||||
|
let output = std::process::Command::new("lua")
|
||||||
|
.arg(&script_path)
|
||||||
|
.output()?;
|
||||||
|
|
||||||
|
if output.status.success() {
|
||||||
|
Ok(String::from_utf8(output.stdout)?)
|
||||||
|
} else {
|
||||||
|
Err(String::from_utf8(output.stderr)?.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for FutriixServer {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
FutriixServer {
|
||||||
|
db: self.db.clone(),
|
||||||
|
time_series_db: self.time_series_db.clone(),
|
||||||
|
command_queue: self.command_queue.clone(),
|
||||||
|
transactions: self.transactions.clone(),
|
||||||
|
indexes: self.indexes.clone(),
|
||||||
|
replication_enabled: self.replication_enabled,
|
||||||
|
peer_nodes: self.peer_nodes.clone(),
|
||||||
|
sync_interval: self.sync_interval,
|
||||||
|
last_sync_timestamp: self.last_sync_timestamp.clone(),
|
||||||
|
command_history: self.command_history.clone(),
|
||||||
|
next_tx_id: self.next_tx_id.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user