Delete futriix-server/src/old-server.rs

This commit is contained in:
Григорий Сафронов 2025-08-02 14:53:05 +00:00
parent 5dfdebd543
commit b34da58b07

View File

@ -1,640 +0,0 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crossbeam::queue::SegQueue;
use parking_lot::RwLock;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::time;
use log::{info, error, debug, warn};
use warp::Filter;
use rlua::Lua;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum Command {
Insert { key: String, value: serde_json::Value },
Get { key: String, version: Option<u64> },
Update { key: String, value: serde_json::Value },
Delete { key: String },
BeginTransaction,
CommitTransaction(u64),
RollbackTransaction(u64),
CreateIndex { field: String },
DropIndex { field: String },
SysExec { script_name: String },
Replicate { commands: Vec<(Command, u128)> },
ReplicationEnable,
ReplicationDisable,
ReplicationAddPeer { addr: String },
ReplicationRemovePeer { addr: String },
ReplicationStatus,
BackupCreate { path: String },
BackupRestore { path: String },
LuaExec { code: String },
}
#[derive(Debug, Serialize, Deserialize)]
pub enum Response {
Success(Option<serde_json::Value>),
Error(String),
ReplicationStatus {
enabled: bool,
peers: Vec<String>,
last_sync: u128,
},
BackupStatus { success: bool, message: String },
LuaOutput(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct VersionedValue {
value: serde_json::Value,
version: u64,
timestamp: u128,
tx_id: Option<u64>,
}
pub struct FutriixServer {
db: Arc<RwLock<HashMap<String, Vec<VersionedValue>>>>,
command_queue: Arc<SegQueue<Command>>,
transactions: Arc<RwLock<HashMap<u64, HashMap<String, VersionedValue>>>>,
indexes: Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
replication_enabled: bool,
peer_nodes: Vec<String>,
sync_interval: u64,
last_sync_timestamp: Arc<RwLock<u128>>,
command_history: Arc<RwLock<Vec<(Command, u128)>>>,
next_tx_id: Arc<RwLock<u64>>,
}
impl FutriixServer {
pub fn new(config: &toml::Value) -> Self {
debug!("Initializing server with config: {:?}", config);
let replication_enabled = config.get("replication")
.and_then(|r| r.get("enabled"))
.and_then(|v| v.as_bool())
.unwrap_or(false);
let peer_nodes = config.get("replication")
.and_then(|r| r.get("peer_nodes"))
.and_then(|n| n.as_array())
.map(|nodes| {
nodes.iter()
.filter_map(|n| n.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
let sync_interval = config.get("replication")
.and_then(|r| r.get("sync_interval"))
.and_then(|i| i.as_integer())
.map(|i| i as u64)
.unwrap_or(1000);
FutriixServer {
db: Arc::new(RwLock::new(HashMap::new())),
command_queue: Arc::new(SegQueue::new()),
transactions: Arc::new(RwLock::new(HashMap::new())),
indexes: Arc::new(RwLock::new(HashMap::new())),
replication_enabled,
peer_nodes,
sync_interval,
last_sync_timestamp: Arc::new(RwLock::new(0)),
command_history: Arc::new(RwLock::new(Vec::new())),
next_tx_id: Arc::new(RwLock::new(1)),
}
}
pub async fn run(&self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
info!("Starting server on {}", addr);
let listener = TcpListener::bind(addr).await?;
let http_port = addr.split(':').last().unwrap_or("8080").parse::<u16>().unwrap_or(8080) + 1;
let http_addr = format!("0.0.0.0:{}", http_port);
self.start_http_api(http_addr.clone());
info!("Futriix Server started successfully!");
info!("Listening on: {}", addr);
info!("HTTP API available on: {}", http_addr);
if self.replication_enabled {
info!("Replication enabled with peers: {:?}", self.peer_nodes);
let sync_task = self.start_replication_sync();
tokio::spawn(sync_task);
}
let scripts_dir = Path::new("scripts");
if !scripts_dir.exists() {
if let Err(e) = std::fs::create_dir(scripts_dir) {
warn!("Failed to create scripts directory: {}", e);
}
}
let backups_dir = Path::new("backups");
if !backups_dir.exists() {
if let Err(e) = std::fs::create_dir(backups_dir) {
warn!("Failed to create backups directory: {}", e);
}
}
loop {
match listener.accept().await {
Ok((socket, _)) => {
let server = self.clone();
tokio::spawn(async move {
if let Err(e) = server.handle_connection(socket).await {
error!("Connection error: {}", e);
}
});
}
Err(e) => {
error!("Accept error: {}", e);
}
}
}
}
fn start_http_api(&self, addr: String) {
let db = self.db.clone();
let command_queue_insert = self.command_queue.clone();
let command_queue_update = self.command_queue.clone();
let command_queue_delete = self.command_queue.clone();
tokio::spawn(async move {
let get = warp::path!("api" / "get" / String)
.and(warp::get())
.map(move |key: String| {
let db = db.read();
match db.get(&key) {
Some(versions) => {
if let Some(latest) = versions.last() {
warp::reply::json(&latest.value)
} else {
warp::reply::json(&serde_json::Value::Null)
}
},
None => warp::reply::json(&serde_json::Value::Null),
}
});
let insert = warp::path!("api" / "insert")
.and(warp::post())
.and(warp::body::json())
.map(move |data: HashMap<String, serde_json::Value>| {
for (key, value) in data {
command_queue_insert.push(Command::Insert { key, value });
}
warp::reply::json(&serde_json::json!({"status": "ok"}))
});
let update = warp::path!("api" / "update")
.and(warp::post())
.and(warp::body::json())
.map(move |data: HashMap<String, serde_json::Value>| {
for (key, value) in data {
command_queue_update.push(Command::Update { key, value });
}
warp::reply::json(&serde_json::json!({"status": "ok"}))
});
let delete = warp::path!("api" / "delete" / String)
.and(warp::delete())
.map(move |key: String| {
command_queue_delete.push(Command::Delete { key });
warp::reply::json(&serde_json::json!({"status": "ok"}))
});
let routes = get.or(insert).or(update).or(delete);
warp::serve(routes)
.run(addr.parse::<std::net::SocketAddr>().unwrap())
.await;
});
}
fn start_replication_sync(&self) -> impl std::future::Future<Output = ()> + 'static {
let peer_nodes = self.peer_nodes.clone();
let command_history = self.command_history.clone();
let last_sync_timestamp = self.last_sync_timestamp.clone();
let sync_interval = self.sync_interval;
async move {
let mut interval = time::interval(Duration::from_millis(sync_interval));
loop {
interval.tick().await;
let commands_to_sync = {
let history = command_history.read();
let last_sync = *last_sync_timestamp.read();
history.iter()
.filter(|(_, ts)| *ts > last_sync)
.map(|(cmd, ts)| (cmd.clone(), *ts))
.collect::<Vec<_>>()
};
if !commands_to_sync.is_empty() {
for peer in &peer_nodes {
if let Ok(mut stream) = TcpStream::connect(peer).await {
let replicate_cmd = Command::Replicate {
commands: commands_to_sync.clone(),
};
if let Ok(cmd_bytes) = rmp_serde::to_vec(&replicate_cmd) {
let len = cmd_bytes.len() as u32;
if let Err(e) = stream.write_all(&len.to_be_bytes()).await {
warn!("Failed to write command length to peer {}: {}", peer, e);
continue;
}
if let Err(e) = stream.write_all(&cmd_bytes).await {
warn!("Failed to write command to peer {}: {}", peer, e);
}
}
}
}
}
}
}
}
async fn handle_connection(
&self,
mut socket: TcpStream,
) -> Result<(), Box<dyn std::error::Error>> {
loop {
let mut len_buf = [0u8; 4];
if let Err(e) = socket.read_exact(&mut len_buf).await {
error!("Failed to read command length: {}", e);
break;
}
let len = u32::from_be_bytes(len_buf) as usize;
let mut buf = vec![0u8; len];
if let Err(e) = socket.read_exact(&mut buf).await {
error!("Failed to read command: {}", e);
break;
}
let cmd: Command = match rmp_serde::from_slice(&buf) {
Ok(cmd) => cmd,
Err(e) => {
error!("Failed to deserialize command: {}", e);
break;
}
};
self.command_queue.push(cmd.clone());
while let Some(cmd) = self.command_queue.pop() {
if let Command::Replicate { commands } = cmd {
for (cmd, ts) in commands {
match cmd {
Command::Insert { key, value } | Command::Update { key, value } => {
let mut db = self.db.write();
let timestamp = match SystemTime::now()
.duration_since(UNIX_EPOCH)
{
Ok(duration) => duration.as_millis(),
Err(e) => {
error!("SystemTime error: {}", e);
0
}
};
let versioned_value = VersionedValue {
value: value.clone(),
version: 0,
timestamp,
tx_id: None,
};
db.entry(key.clone())
.or_default()
.push(versioned_value);
}
_ => {
let _ = Self::process_command(
&self.db,
&self.transactions,
&self.indexes,
&self.next_tx_id,
cmd,
);
}
}
*self.last_sync_timestamp.write() = ts;
}
continue;
}
let response = Self::process_command(
&self.db,
&self.transactions,
&self.indexes,
&self.next_tx_id,
cmd.clone(),
);
let response_bytes = match rmp_serde::to_vec(&response) {
Ok(bytes) => bytes,
Err(e) => {
error!("Failed to serialize response: {}", e);
break;
}
};
let len = response_bytes.len() as u32;
if let Err(e) = socket.write_all(&len.to_be_bytes()).await {
error!("Failed to write response length: {}", e);
break;
}
if let Err(e) = socket.write_all(&response_bytes).await {
error!("Failed to write response: {}", e);
break;
}
}
}
Ok(())
}
fn process_command(
db: &Arc<RwLock<HashMap<String, Vec<VersionedValue>>>>,
transactions: &Arc<RwLock<HashMap<u64, HashMap<String, VersionedValue>>>>,
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
next_tx_id: &Arc<RwLock<u64>>,
cmd: Command,
) -> Response {
match cmd {
Command::Insert { key, value } => {
let mut db = db.write();
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
let version = db.get(&key).map_or(0, |v| v.last().map_or(0, |vv| vv.version + 1));
let versioned_value = VersionedValue {
value: value.clone(),
version,
timestamp,
tx_id: None,
};
db.entry(key.clone())
.or_default()
.push(versioned_value.clone());
Self::update_indexes(indexes, key, &value);
Response::Success(None)
},
Command::Get { key, version } => {
let db = db.read();
match db.get(&key) {
Some(versions) => {
let value = if let Some(v) = version {
versions.iter().find(|vv| vv.version == v)
} else {
versions.last()
};
match value {
Some(vv) => Response::Success(Some(vv.value.clone())),
None => Response::Error("Version not found".to_string()),
}
},
None => Response::Error("Key not found".to_string()),
}
},
Command::Update { key, value } => {
let mut db = db.write();
if db.contains_key(&key) {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
let version = db.get(&key).unwrap().last().unwrap().version + 1;
let versioned_value = VersionedValue {
value: value.clone(),
version,
timestamp,
tx_id: None,
};
db.get_mut(&key).unwrap().push(versioned_value.clone());
Self::update_indexes(indexes, key, &value);
Response::Success(None)
} else {
Response::Error("Key not found".to_string())
}
},
Command::Delete { key } => {
let mut db = db.write();
if db.remove(&key).is_some() {
Self::remove_from_indexes(indexes, key);
Response::Success(None)
} else {
Response::Error("Key not found".to_string())
}
},
Command::BeginTransaction => {
let tx_id = {
let mut next_id = next_tx_id.write();
let id = *next_id;
*next_id += 1;
id
};
let mut transactions = transactions.write();
transactions.insert(tx_id, HashMap::new());
Response::Success(Some(tx_id.into()))
},
Command::CommitTransaction(tx_id) => {
let mut db = db.write();
let mut transactions = transactions.write();
if let Some(tx_data) = transactions.remove(&tx_id) {
for (key, versioned_value) in tx_data {
db.entry(key.clone())
.or_default()
.push(versioned_value.clone());
Self::update_indexes(indexes, key, &versioned_value.value);
}
Response::Success(None)
} else {
Response::Error("Transaction not found".to_string())
}
},
Command::RollbackTransaction(tx_id) => {
let mut transactions = transactions.write();
if transactions.remove(&tx_id).is_some() {
Response::Success(None)
} else {
Response::Error("Transaction not found".to_string())
}
},
Command::CreateIndex { field } => {
let mut indexes = indexes.write();
if !indexes.contains_key(&field) {
indexes.insert(field, HashMap::new());
Response::Success(None)
} else {
Response::Error("Index already exists".to_string())
}
},
Command::DropIndex { field } => {
let mut indexes = indexes.write();
if indexes.remove(&field).is_some() {
Response::Success(None)
} else {
Response::Error("Index not found".to_string())
}
},
Command::SysExec { script_name } => {
match Self::execute_lua_script(&script_name) {
Ok(output) => Response::Success(Some(output.into())),
Err(e) => Response::Error(format!("Script execution failed: {}", e)),
}
},
Command::LuaExec { code } => {
match Self::execute_lua_code(&code) {
Ok(output) => Response::LuaOutput(output),
Err(e) => Response::Error(format!("Lua execution failed: {}", e)),
}
},
Command::ReplicationEnable => Response::Success(None),
Command::ReplicationDisable => Response::Success(None),
Command::ReplicationAddPeer { addr: _ } => Response::Success(None),
Command::ReplicationRemovePeer { addr: _ } => Response::Success(None),
Command::ReplicationStatus => Response::Success(None),
Command::Replicate { .. } => Response::Success(None),
Command::BackupCreate { path } => {
match Self::create_backup(db, &path) {
Ok(_) => Response::BackupStatus {
success: true,
message: format!("Backup created successfully at {}", path),
},
Err(e) => Response::BackupStatus {
success: false,
message: format!("Backup failed: {}", e),
},
}
},
Command::BackupRestore { path } => {
match Self::restore_backup(db, indexes, &path) {
Ok(_) => Response::BackupStatus {
success: true,
message: format!("Backup restored successfully from {}", path),
},
Err(e) => Response::BackupStatus {
success: false,
message: format!("Restore failed: {}", e),
},
}
},
}
}
fn execute_lua_code(code: &str) -> Result<String, Box<dyn std::error::Error>> {
let lua = Lua::new();
let result = lua.load(code).eval::<rlua::String>()?;
Ok(result.to_str()?.to_string())
}
fn create_backup(
db: &Arc<RwLock<HashMap<String, Vec<VersionedValue>>>>,
path: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let db = db.read();
let serialized = serde_json::to_string(&*db)?;
std::fs::write(path, serialized)?;
Ok(())
}
fn restore_backup(
db: &Arc<RwLock<HashMap<String, Vec<VersionedValue>>>>,
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
path: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let data = std::fs::read_to_string(path)?;
let restored: HashMap<String, Vec<VersionedValue>> = serde_json::from_str(&data)?;
let mut db = db.write();
*db = restored;
let mut indexes = indexes.write();
indexes.clear();
for (key, versions) in db.iter() {
if let Some(latest) = versions.last() {
for (field, index) in indexes.iter_mut() {
if let Some(field_value) = latest.value.get(field) {
index.entry(field_value.clone())
.or_default()
.push(key.clone());
}
}
}
}
Ok(())
}
fn update_indexes(
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
key: String,
value: &serde_json::Value,
) {
let mut indexes = indexes.write();
for (field, index) in indexes.iter_mut() {
if let Some(field_value) = value.get(field) {
index.entry(field_value.clone())
.or_default()
.push(key.clone());
}
}
}
fn remove_from_indexes(
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
key: String,
) {
let mut indexes = indexes.write();
for index in indexes.values_mut() {
for entries in index.values_mut() {
if let Some(pos) = entries.iter().position(|k| k == &key) {
entries.remove(pos);
}
}
}
}
fn execute_lua_script(script_name: &str) -> Result<String, Box<dyn std::error::Error>> {
let script_path = Path::new("scripts").join(script_name).with_extension("lua");
let output = std::process::Command::new("lua")
.arg(&script_path)
.output()?;
if output.status.success() {
Ok(String::from_utf8(output.stdout)?)
} else {
Err(String::from_utf8(output.stderr)?.into())
}
}
}
impl Clone for FutriixServer {
fn clone(&self) -> Self {
FutriixServer {
db: self.db.clone(),
command_queue: self.command_queue.clone(),
transactions: self.transactions.clone(),
indexes: self.indexes.clone(),
replication_enabled: self.replication_enabled,
peer_nodes: self.peer_nodes.clone(),
sync_interval: self.sync_interval,
last_sync_timestamp: self.last_sync_timestamp.clone(),
command_history: self.command_history.clone(),
next_tx_id: self.next_tx_id.clone(),
}
}
}