Update futriix-server/src/server.rs
This commit is contained in:
parent
9f4badf1ea
commit
c4fbb11e7e
@ -2,13 +2,18 @@ use serde::{Deserialize, Serialize};
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
use std::time::Duration;
|
||||||
use crossbeam::queue::SegQueue;
|
use crossbeam::queue::SegQueue;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use log::{info, error};
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tokio::time;
|
||||||
|
use log::{info, error, warn, debug};
|
||||||
use colored::Colorize;
|
use colored::Colorize;
|
||||||
|
use warp::Filter;
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
pub enum Command {
|
pub enum Command {
|
||||||
Insert { key: String, value: serde_json::Value },
|
Insert { key: String, value: serde_json::Value },
|
||||||
Get { key: String },
|
Get { key: String },
|
||||||
@ -20,6 +25,7 @@ pub enum Command {
|
|||||||
CreateIndex { field: String },
|
CreateIndex { field: String },
|
||||||
DropIndex { field: String },
|
DropIndex { field: String },
|
||||||
SysExec { script_name: String },
|
SysExec { script_name: String },
|
||||||
|
Replicate { commands: Vec<(Command, u128)> },
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
@ -28,50 +34,270 @@ pub enum Response {
|
|||||||
Error(String),
|
Error(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
pub struct FutriixServer {
|
pub struct FutriixServer {
|
||||||
db: Arc<RwLock<HashMap<String, serde_json::Value>>>,
|
db: Arc<RwLock<HashMap<String, serde_json::Value>>>,
|
||||||
command_queue: Arc<SegQueue<Command>>,
|
command_queue: Arc<SegQueue<Command>>,
|
||||||
transactions: Arc<RwLock<Vec<HashMap<String, serde_json::Value>>>>,
|
transactions: Arc<RwLock<Vec<HashMap<String, serde_json::Value>>>>,
|
||||||
indexes: Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
indexes: Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||||
|
replication_enabled: bool,
|
||||||
|
peer_nodes: Vec<String>,
|
||||||
|
sync_interval: u64,
|
||||||
|
last_sync_timestamp: Arc<RwLock<u128>>,
|
||||||
|
command_history: Arc<RwLock<Vec<(Command, u128)>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FutriixServer {
|
impl FutriixServer {
|
||||||
pub fn new() -> Self {
|
pub fn new(config: &toml::Value) -> Self {
|
||||||
|
debug!("Initializing server with config: {:?}", config);
|
||||||
|
|
||||||
|
let replication_enabled = config.get("replication")
|
||||||
|
.and_then(|r| r.get("enabled"))
|
||||||
|
.and_then(|v| v.as_bool())
|
||||||
|
.unwrap_or_else(|| {
|
||||||
|
warn!("Replication 'enabled' not found in config, defaulting to false");
|
||||||
|
false
|
||||||
|
});
|
||||||
|
|
||||||
|
let peer_nodes = config.get("replication")
|
||||||
|
.and_then(|r| r.get("peer_nodes"))
|
||||||
|
.and_then(|n| n.as_array())
|
||||||
|
.map(|nodes| {
|
||||||
|
nodes.iter()
|
||||||
|
.filter_map(|n| n.as_str().map(|s| s.to_string()))
|
||||||
|
.collect()
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|| {
|
||||||
|
warn!("No peer nodes configured, defaulting to empty list");
|
||||||
|
Vec::new()
|
||||||
|
});
|
||||||
|
|
||||||
|
let sync_interval = config.get("replication")
|
||||||
|
.and_then(|r| r.get("sync_interval"))
|
||||||
|
.and_then(|i| i.as_integer())
|
||||||
|
.map(|i| i as u64)
|
||||||
|
.unwrap_or_else(|| {
|
||||||
|
warn!("Sync interval not found, defaulting to 1000ms");
|
||||||
|
1000
|
||||||
|
});
|
||||||
|
|
||||||
|
debug!("Replication config - enabled: {}, peers: {:?}, interval: {}ms",
|
||||||
|
replication_enabled, peer_nodes, sync_interval);
|
||||||
|
|
||||||
FutriixServer {
|
FutriixServer {
|
||||||
db: Arc::new(RwLock::new(HashMap::new())),
|
db: Arc::new(RwLock::new(HashMap::new())),
|
||||||
command_queue: Arc::new(SegQueue::new()),
|
command_queue: Arc::new(SegQueue::new()),
|
||||||
transactions: Arc::new(RwLock::new(Vec::new())),
|
transactions: Arc::new(RwLock::new(Vec::new())),
|
||||||
indexes: Arc::new(RwLock::new(HashMap::new())),
|
indexes: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
replication_enabled,
|
||||||
|
peer_nodes,
|
||||||
|
sync_interval,
|
||||||
|
last_sync_timestamp: Arc::new(RwLock::new(0)),
|
||||||
|
command_history: Arc::new(RwLock::new(Vec::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(&self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
|
pub async fn run_rest_api(&self, rest_port: u16) {
|
||||||
let listener = TcpListener::bind(addr).await?;
|
let db_get = self.db.clone();
|
||||||
|
let db_set = self.db.clone();
|
||||||
|
let db_update = self.db.clone();
|
||||||
|
let db_delete = self.db.clone();
|
||||||
|
let db_list_keys = self.db.clone();
|
||||||
|
let db_list_commit = self.db.clone();
|
||||||
|
let transactions_begin = self.transactions.clone();
|
||||||
|
let transactions_commit = self.transactions.clone();
|
||||||
|
let transactions_rollback = self.transactions.clone();
|
||||||
|
|
||||||
|
let get = warp::path!("get" / String)
|
||||||
|
.and(warp::get())
|
||||||
|
.map(move |key: String| {
|
||||||
|
let db = db_get.read();
|
||||||
|
match db.get(&key) {
|
||||||
|
Some(value) => warp::reply::json(&Response::Success(Some(value.clone()))),
|
||||||
|
None => warp::reply::json(&Response::Error("Key not found".to_string()))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let set = warp::path!("set" / String)
|
||||||
|
.and(warp::post())
|
||||||
|
.and(warp::body::json())
|
||||||
|
.map(move |key: String, value: serde_json::Value| {
|
||||||
|
let mut db = db_set.write();
|
||||||
|
db.insert(key.clone(), value.clone());
|
||||||
|
warp::reply::json(&Response::Success(None))
|
||||||
|
});
|
||||||
|
|
||||||
|
let update = warp::path!("update" / String)
|
||||||
|
.and(warp::put())
|
||||||
|
.and(warp::body::json())
|
||||||
|
.map(move |key: String, value: serde_json::Value| {
|
||||||
|
let mut db = db_update.write();
|
||||||
|
if db.contains_key(&key) {
|
||||||
|
db.insert(key.clone(), value.clone());
|
||||||
|
warp::reply::json(&Response::Success(None))
|
||||||
|
} else {
|
||||||
|
warp::reply::json(&Response::Error("Key not found".to_string()))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let delete = warp::path!("delete" / String)
|
||||||
|
.and(warp::delete())
|
||||||
|
.map(move |key: String| {
|
||||||
|
let mut db = db_delete.write();
|
||||||
|
if db.remove(&key).is_some() {
|
||||||
|
warp::reply::json(&Response::Success(None))
|
||||||
|
} else {
|
||||||
|
warp::reply::json(&Response::Error("Key not found".to_string()))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let list = warp::path!("list")
|
||||||
|
.and(warp::get())
|
||||||
|
.map(move || {
|
||||||
|
let db = db_list_keys.read();
|
||||||
|
let keys: Vec<String> = db.keys().cloned().collect();
|
||||||
|
warp::reply::json(&Response::Success(Some(serde_json::json!(keys))))
|
||||||
|
});
|
||||||
|
|
||||||
|
let begin_tx = warp::path!("transaction" / "begin")
|
||||||
|
.and(warp::get())
|
||||||
|
.map(move || {
|
||||||
|
let mut transactions = transactions_begin.write();
|
||||||
|
transactions.push(HashMap::new());
|
||||||
|
warp::reply::json(&Response::Success(None))
|
||||||
|
});
|
||||||
|
|
||||||
|
let commit_tx = warp::path!("transaction" / "commit")
|
||||||
|
.and(warp::post())
|
||||||
|
.map(move || {
|
||||||
|
let mut transactions = transactions_commit.write();
|
||||||
|
if let Some(tx) = transactions.pop() {
|
||||||
|
let mut db = db_list_commit.write();
|
||||||
|
for (k, v) in tx {
|
||||||
|
if v.is_null() {
|
||||||
|
db.remove(&k);
|
||||||
|
} else {
|
||||||
|
db.insert(k.clone(), v.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
warp::reply::json(&Response::Success(None))
|
||||||
|
} else {
|
||||||
|
warp::reply::json(&Response::Error("No active transaction".to_string()))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let rollback_tx = warp::path!("transaction" / "rollback")
|
||||||
|
.and(warp::post())
|
||||||
|
.map(move || {
|
||||||
|
let mut transactions = transactions_rollback.write();
|
||||||
|
if transactions.pop().is_some() {
|
||||||
|
warp::reply::json(&Response::Success(None))
|
||||||
|
} else {
|
||||||
|
warp::reply::json(&Response::Error("No active transaction".to_string()))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let routes = get
|
||||||
|
.or(set)
|
||||||
|
.or(update)
|
||||||
|
.or(delete)
|
||||||
|
.or(list)
|
||||||
|
.or(begin_tx)
|
||||||
|
.or(commit_tx)
|
||||||
|
.or(rollback_tx);
|
||||||
|
|
||||||
|
info!("Starting REST API on port {}", rest_port);
|
||||||
|
warp::serve(routes).run(([127, 0, 0, 1], rest_port)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
info!("Starting server on {}", addr);
|
||||||
|
let listener = match TcpListener::bind(addr).await {
|
||||||
|
Ok(l) => l,
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to bind to {}: {}", addr, e);
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
println!();
|
println!();
|
||||||
info!("{}", "Futriix Server started successfully!".bright_cyan());
|
info!("{}", "Futriix Server started successfully!".truecolor(0, 183, 235));
|
||||||
info!("Listening on: {}", addr);
|
info!("Listening on: {}", addr);
|
||||||
info!("Log file: futriix.log");
|
info!("Log file: futriix.log");
|
||||||
|
|
||||||
|
if self.replication_enabled {
|
||||||
|
if self.peer_nodes.is_empty() {
|
||||||
|
warn!("Replication enabled but no peer nodes configured");
|
||||||
|
} else {
|
||||||
|
info!("Replication enabled with peers: {:?}", self.peer_nodes);
|
||||||
|
let sync_task = self.start_replication_sync();
|
||||||
|
tokio::spawn(sync_task);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
info!("Replication disabled");
|
||||||
|
}
|
||||||
|
|
||||||
|
println!();
|
||||||
|
warn!("Replication 'enabled' not found in config, defaulting to false");
|
||||||
|
warn!("No peer nodes configured, defaulting to empty list");
|
||||||
|
warn!("Sync interval not found, defaulting to 1000ms");
|
||||||
|
info!("Starting server on {}", addr);
|
||||||
|
|
||||||
|
let rest_port = addr.split(':').last()
|
||||||
|
.and_then(|p| p.parse::<u16>().ok())
|
||||||
|
.map(|p| p + 1)
|
||||||
|
.unwrap_or(8081);
|
||||||
|
|
||||||
|
let server = self.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
server.run_rest_api(rest_port).await;
|
||||||
|
});
|
||||||
|
|
||||||
let scripts_dir = Path::new("scripts");
|
let scripts_dir = Path::new("scripts");
|
||||||
if !scripts_dir.exists() {
|
if !scripts_dir.exists() {
|
||||||
std::fs::create_dir(scripts_dir)?;
|
if let Err(e) = std::fs::create_dir(scripts_dir) {
|
||||||
|
error!("Failed to create scripts directory: {}", e);
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
info!("Created scripts directory");
|
info!("Created scripts directory");
|
||||||
} else if !scripts_dir.is_dir() {
|
} else if !scripts_dir.is_dir() {
|
||||||
|
error!("Scripts path exists but is not a directory");
|
||||||
return Err("Scripts path exists but is not a directory".into());
|
return Err("Scripts path exists but is not a directory".into());
|
||||||
}
|
}
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
res = async {
|
res = async {
|
||||||
loop {
|
loop {
|
||||||
let (socket, _) = listener.accept().await?;
|
let (socket, _) = match listener.accept().await {
|
||||||
|
Ok((s, a)) => {
|
||||||
|
debug!("New connection from {}", a);
|
||||||
|
(s, a)
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
error!("Accept error: {}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let db = self.db.clone();
|
let db = self.db.clone();
|
||||||
let queue = self.command_queue.clone();
|
let queue = self.command_queue.clone();
|
||||||
let transactions = self.transactions.clone();
|
let transactions = self.transactions.clone();
|
||||||
let indexes = self.indexes.clone();
|
let indexes = self.indexes.clone();
|
||||||
|
let command_history = self.command_history.clone();
|
||||||
|
let last_sync_timestamp = self.last_sync_timestamp.clone();
|
||||||
|
let replication_enabled = self.replication_enabled;
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = Self::handle_connection(socket, db, queue, transactions, indexes).await {
|
if let Err(e) = Self::handle_connection(
|
||||||
|
socket,
|
||||||
|
db,
|
||||||
|
queue,
|
||||||
|
transactions,
|
||||||
|
indexes,
|
||||||
|
command_history,
|
||||||
|
last_sync_timestamp,
|
||||||
|
replication_enabled,
|
||||||
|
).await {
|
||||||
error!("Connection error: {}", e);
|
error!("Connection error: {}", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -81,182 +307,358 @@ impl FutriixServer {
|
|||||||
} => res,
|
} => res,
|
||||||
_ = tokio::signal::ctrl_c() => {
|
_ = tokio::signal::ctrl_c() => {
|
||||||
println!();
|
println!();
|
||||||
info!("{}", "Server shutdown now".bright_cyan());
|
info!("{}", "Server shutting down".truecolor(0, 183, 235));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn start_replication_sync(&self) -> impl std::future::Future<Output = ()> + 'static {
|
||||||
|
let peer_nodes = self.peer_nodes.clone();
|
||||||
|
let command_history = self.command_history.clone();
|
||||||
|
let last_sync_timestamp = self.last_sync_timestamp.clone();
|
||||||
|
let sync_interval = self.sync_interval;
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let mut interval = time::interval(Duration::from_millis(sync_interval));
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
|
||||||
|
let commands_to_sync = {
|
||||||
|
let history = command_history.read();
|
||||||
|
let last_sync = *last_sync_timestamp.read();
|
||||||
|
history.iter()
|
||||||
|
.filter(|(_, ts)| *ts > last_sync)
|
||||||
|
.map(|(cmd, ts)| (cmd.clone(), *ts))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
};
|
||||||
|
|
||||||
|
if !commands_to_sync.is_empty() {
|
||||||
|
debug!("Syncing {} commands with peers", commands_to_sync.len());
|
||||||
|
for peer in &peer_nodes {
|
||||||
|
debug!("Attempting to sync with peer: {}", peer);
|
||||||
|
match TcpStream::connect(peer).await {
|
||||||
|
Ok(mut stream) => {
|
||||||
|
let replicate_cmd = Command::Replicate {
|
||||||
|
commands: commands_to_sync.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
match rmp_serde::to_vec(&replicate_cmd) {
|
||||||
|
Ok(cmd_bytes) => {
|
||||||
|
let len = cmd_bytes.len() as u32;
|
||||||
|
if let Err(e) = stream.write_all(&len.to_be_bytes()).await {
|
||||||
|
warn!("Failed to send sync data to {}: {}", peer, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if let Err(e) = stream.write_all(&cmd_bytes).await {
|
||||||
|
warn!("Failed to send sync data to {}: {}", peer, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some((_, latest_ts)) = commands_to_sync.last() {
|
||||||
|
*last_sync_timestamp.write() = *latest_ts;
|
||||||
|
debug!("Updated last sync timestamp to {}", latest_ts);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to serialize sync command: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to connect to peer {}: {}", peer, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_connection(
|
async fn handle_connection(
|
||||||
mut socket: tokio::net::TcpStream,
|
mut socket: tokio::net::TcpStream,
|
||||||
db: Arc<RwLock<HashMap<String, serde_json::Value>>>,
|
db: Arc<RwLock<HashMap<String, serde_json::Value>>>,
|
||||||
queue: Arc<SegQueue<Command>>,
|
queue: Arc<SegQueue<Command>>,
|
||||||
transactions: Arc<RwLock<Vec<HashMap<String, serde_json::Value>>>>,
|
transactions: Arc<RwLock<Vec<HashMap<String, serde_json::Value>>>>,
|
||||||
indexes: Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
indexes: Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||||
|
command_history: Arc<RwLock<Vec<(Command, u128)>>>,
|
||||||
|
last_sync_timestamp: Arc<RwLock<u128>>,
|
||||||
|
replication_enabled: bool,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut len_buf = [0u8; 4];
|
let mut len_buf = [0u8; 4];
|
||||||
if let Err(e) = socket.read_exact(&mut len_buf).await {
|
if let Err(e) = socket.read_exact(&mut len_buf).await {
|
||||||
if e.kind() == std::io::ErrorKind::UnexpectedEof {
|
if e.kind() == std::io::ErrorKind::UnexpectedEof {
|
||||||
|
debug!("Client disconnected");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
error!("Read error: {}", e);
|
||||||
return Err(e.into());
|
return Err(e.into());
|
||||||
}
|
}
|
||||||
let len = u32::from_be_bytes(len_buf) as usize;
|
let len = u32::from_be_bytes(len_buf) as usize;
|
||||||
|
|
||||||
let mut buf = vec![0u8; len];
|
let mut buf = vec![0u8; len];
|
||||||
socket.read_exact(&mut buf).await?;
|
if let Err(e) = socket.read_exact(&mut buf).await {
|
||||||
|
error!("Failed to read command: {}", e);
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
|
||||||
let cmd: Command = rmp_serde::from_slice(&buf)?;
|
let cmd: Command = match rmp_serde::from_slice(&buf) {
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to deserialize command: {}", e);
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
debug!("Received command: {:?}", cmd);
|
||||||
queue.push(cmd);
|
queue.push(cmd);
|
||||||
|
|
||||||
while let Some(cmd) = queue.pop() {
|
while let Some(cmd) = queue.pop() {
|
||||||
let response = match cmd {
|
if let Command::Replicate { commands } = cmd {
|
||||||
Command::Insert { key, value } => {
|
debug!("Processing replication batch ({} commands)", commands.len());
|
||||||
let mut tx_data = None;
|
for (cmd, ts) in commands {
|
||||||
{
|
let _response = Self::process_command(
|
||||||
let mut transactions = transactions.write();
|
cmd,
|
||||||
if let Some(tx) = transactions.last_mut() {
|
&db,
|
||||||
tx.insert(key.clone(), value.clone());
|
&transactions,
|
||||||
tx_data = Some((key.clone(), value.clone()));
|
&indexes,
|
||||||
}
|
);
|
||||||
}
|
|
||||||
|
|
||||||
if tx_data.is_none() {
|
*last_sync_timestamp.write() = ts;
|
||||||
let mut db = db.write();
|
debug!("Replicated command with timestamp {}", ts);
|
||||||
db.insert(key.clone(), value.clone());
|
}
|
||||||
Self::update_indexes(&indexes, key.clone(), &value);
|
continue;
|
||||||
}
|
}
|
||||||
Response::Success(None)
|
|
||||||
},
|
|
||||||
Command::Get { key } => {
|
|
||||||
let db = db.read();
|
|
||||||
match db.get(&key) {
|
|
||||||
Some(value) => Response::Success(Some(value.clone())),
|
|
||||||
None => Response::Error("Key not found".to_string()),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::Update { key, value } => {
|
|
||||||
let mut tx_data = None;
|
|
||||||
{
|
|
||||||
let mut transactions = transactions.write();
|
|
||||||
if let Some(tx) = transactions.last_mut() {
|
|
||||||
tx.insert(key.clone(), value.clone());
|
|
||||||
tx_data = Some((key.clone(), value.clone()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if tx_data.is_none() {
|
|
||||||
let mut db = db.write();
|
|
||||||
if db.contains_key(&key) {
|
|
||||||
db.insert(key.clone(), value.clone());
|
|
||||||
Self::update_indexes(&indexes, key.clone(), &value);
|
|
||||||
Response::Success(None)
|
|
||||||
} else {
|
|
||||||
Response::Error("Key not found".to_string())
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Response::Success(None)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::Delete { key } => {
|
|
||||||
let mut tx_data = None;
|
|
||||||
{
|
|
||||||
let mut transactions = transactions.write();
|
|
||||||
if let Some(tx) = transactions.last_mut() {
|
|
||||||
tx.insert(key.clone(), serde_json::Value::Null);
|
|
||||||
tx_data = Some(key.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if tx_data.is_none() {
|
|
||||||
let mut db = db.write();
|
|
||||||
if db.remove(&key).is_some() {
|
|
||||||
Self::remove_from_indexes(&indexes, key.clone());
|
|
||||||
Response::Success(None)
|
|
||||||
} else {
|
|
||||||
Response::Error("Key not found".to_string())
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Response::Success(None)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::BeginTransaction => {
|
|
||||||
let mut transactions = transactions.write();
|
|
||||||
transactions.push(HashMap::new());
|
|
||||||
Response::Success(None)
|
|
||||||
},
|
|
||||||
Command::CommitTransaction => {
|
|
||||||
let mut transactions = transactions.write();
|
|
||||||
if let Some(tx) = transactions.pop() {
|
|
||||||
let mut db = db.write();
|
|
||||||
for (k, v) in tx {
|
|
||||||
if v.is_null() {
|
|
||||||
db.remove(&k);
|
|
||||||
Self::remove_from_indexes(&indexes, k.clone());
|
|
||||||
} else {
|
|
||||||
db.insert(k.clone(), v.clone());
|
|
||||||
Self::update_indexes(&indexes, k.clone(), &v);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Response::Success(None)
|
|
||||||
} else {
|
|
||||||
Response::Error("No active transaction".to_string())
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::RollbackTransaction => {
|
|
||||||
let mut transactions = transactions.write();
|
|
||||||
if transactions.pop().is_some() {
|
|
||||||
Response::Success(None)
|
|
||||||
} else {
|
|
||||||
Response::Error("No active transaction".to_string())
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::CreateIndex { field } => {
|
|
||||||
let mut indexes = indexes.write();
|
|
||||||
if !indexes.contains_key(&field) {
|
|
||||||
let mut index = HashMap::new();
|
|
||||||
let db = db.read();
|
|
||||||
for (key, value) in db.iter() {
|
|
||||||
if let Some(field_value) = value.get(&field) {
|
|
||||||
index.entry(field_value.clone())
|
|
||||||
.or_insert_with(Vec::new)
|
|
||||||
.push(key.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
indexes.insert(field, index);
|
|
||||||
Response::Success(None)
|
|
||||||
} else {
|
|
||||||
Response::Error("Index already exists".to_string())
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::DropIndex { field } => {
|
|
||||||
let mut indexes = indexes.write();
|
|
||||||
if indexes.remove(&field).is_some() {
|
|
||||||
Response::Success(None)
|
|
||||||
} else {
|
|
||||||
Response::Error("Index not found".to_string())
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::SysExec { script_name } => {
|
|
||||||
match Self::execute_lua_script(&script_name) {
|
|
||||||
Ok(output) => Response::Success(Some(output.into())),
|
|
||||||
Err(e) => Response::Error(format!("Script execution failed: {}", e)),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
let response_bytes = rmp_serde::to_vec(&response)?;
|
let timestamp = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.map_err(|e| {
|
||||||
|
error!("SystemTime error: {}", e);
|
||||||
|
e
|
||||||
|
})?
|
||||||
|
.as_millis();
|
||||||
|
|
||||||
|
let response = Self::process_command(
|
||||||
|
cmd.clone(),
|
||||||
|
&db,
|
||||||
|
&transactions,
|
||||||
|
&indexes,
|
||||||
|
);
|
||||||
|
|
||||||
|
if replication_enabled && !matches!(cmd, Command::Get { .. }) {
|
||||||
|
command_history.write().push((cmd, timestamp));
|
||||||
|
debug!("Added command to history");
|
||||||
|
}
|
||||||
|
|
||||||
|
let response_bytes = match rmp_serde::to_vec(&response) {
|
||||||
|
Ok(b) => b,
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to serialize response: {}", e);
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
};
|
||||||
let len = response_bytes.len() as u32;
|
let len = response_bytes.len() as u32;
|
||||||
socket.write_all(&len.to_be_bytes()).await?;
|
if let Err(e) = socket.write_all(&len.to_be_bytes()).await {
|
||||||
socket.write_all(&response_bytes).await?;
|
error!("Failed to send response length: {}", e);
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
if let Err(e) = socket.write_all(&response_bytes).await {
|
||||||
|
error!("Failed to send response: {}", e);
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
debug!("Sent response to client");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn process_command(
|
||||||
|
cmd: Command,
|
||||||
|
db: &Arc<RwLock<HashMap<String, serde_json::Value>>>,
|
||||||
|
transactions: &Arc<RwLock<Vec<HashMap<String, serde_json::Value>>>>,
|
||||||
|
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||||
|
) -> Response {
|
||||||
|
match cmd {
|
||||||
|
Command::Insert { key, value } => {
|
||||||
|
debug!("Insert command for key: {}", key);
|
||||||
|
let mut tx_data = None;
|
||||||
|
{
|
||||||
|
let mut transactions = transactions.write();
|
||||||
|
if let Some(tx) = transactions.last_mut() {
|
||||||
|
tx.insert(key.clone(), value.clone());
|
||||||
|
tx_data = Some((key.clone(), value.clone()));
|
||||||
|
debug!("Added to transaction");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if tx_data.is_none() {
|
||||||
|
let mut db = db.write();
|
||||||
|
db.insert(key.clone(), value.clone());
|
||||||
|
Self::update_indexes(indexes, key.clone(), &value);
|
||||||
|
debug!("Direct insert to DB");
|
||||||
|
}
|
||||||
|
Response::Success(None)
|
||||||
|
},
|
||||||
|
Command::Get { key } => {
|
||||||
|
debug!("Get command for key: {}", key);
|
||||||
|
let db = db.read();
|
||||||
|
match db.get(&key) {
|
||||||
|
Some(value) => {
|
||||||
|
debug!("Key found");
|
||||||
|
Response::Success(Some(value.clone()))
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
debug!("Key not found");
|
||||||
|
Response::Error("Key not found".to_string())
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::Update { key, value } => {
|
||||||
|
debug!("Update command for key: {}", key);
|
||||||
|
let mut tx_data = None;
|
||||||
|
{
|
||||||
|
let mut transactions = transactions.write();
|
||||||
|
if let Some(tx) = transactions.last_mut() {
|
||||||
|
tx.insert(key.clone(), value.clone());
|
||||||
|
tx_data = Some((key.clone(), value.clone()));
|
||||||
|
debug!("Added to transaction");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if tx_data.is_none() {
|
||||||
|
let mut db = db.write();
|
||||||
|
if db.contains_key(&key) {
|
||||||
|
db.insert(key.clone(), value.clone());
|
||||||
|
Self::update_indexes(indexes, key.clone(), &value);
|
||||||
|
debug!("Direct update in DB");
|
||||||
|
Response::Success(None)
|
||||||
|
} else {
|
||||||
|
debug!("Key not found for update");
|
||||||
|
Response::Error("Key not found".to_string())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Response::Success(None)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::Delete { key } => {
|
||||||
|
debug!("Delete command for key: {}", key);
|
||||||
|
let mut tx_data = None;
|
||||||
|
{
|
||||||
|
let mut transactions = transactions.write();
|
||||||
|
if let Some(tx) = transactions.last_mut() {
|
||||||
|
tx.insert(key.clone(), serde_json::Value::Null);
|
||||||
|
tx_data = Some(key.clone());
|
||||||
|
debug!("Added to transaction");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if tx_data.is_none() {
|
||||||
|
let mut db = db.write();
|
||||||
|
if db.remove(&key).is_some() {
|
||||||
|
Self::remove_from_indexes(indexes, key.clone());
|
||||||
|
debug!("Direct delete from DB");
|
||||||
|
Response::Success(None)
|
||||||
|
} else {
|
||||||
|
debug!("Key not found for deletion");
|
||||||
|
Response::Error("Key not found".to_string())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Response::Success(None)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::BeginTransaction => {
|
||||||
|
debug!("Begin transaction");
|
||||||
|
let mut transactions = transactions.write();
|
||||||
|
transactions.push(HashMap::new());
|
||||||
|
Response::Success(None)
|
||||||
|
},
|
||||||
|
Command::CommitTransaction => {
|
||||||
|
debug!("Commit transaction");
|
||||||
|
let mut transactions = transactions.write();
|
||||||
|
if let Some(tx) = transactions.pop() {
|
||||||
|
let mut db = db.write();
|
||||||
|
for (k, v) in tx {
|
||||||
|
if v.is_null() {
|
||||||
|
db.remove(&k);
|
||||||
|
Self::remove_from_indexes(indexes, k.clone());
|
||||||
|
debug!("Removed key: {}", k);
|
||||||
|
} else {
|
||||||
|
db.insert(k.clone(), v.clone());
|
||||||
|
Self::update_indexes(indexes, k.clone(), &v);
|
||||||
|
debug!("Committed key: {}", k);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Response::Success(None)
|
||||||
|
} else {
|
||||||
|
debug!("No active transaction to commit");
|
||||||
|
Response::Error("No active transaction".to_string())
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::RollbackTransaction => {
|
||||||
|
debug!("Rollback transaction");
|
||||||
|
let mut transactions = transactions.write();
|
||||||
|
if transactions.pop().is_some() {
|
||||||
|
Response::Success(None)
|
||||||
|
} else {
|
||||||
|
debug!("No active transaction to rollback");
|
||||||
|
Response::Error("No active transaction".to_string())
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::CreateIndex { field } => {
|
||||||
|
debug!("Create index on field: {}", field);
|
||||||
|
let mut indexes = indexes.write();
|
||||||
|
if !indexes.contains_key(&field) {
|
||||||
|
let mut index = HashMap::new();
|
||||||
|
let db = db.read();
|
||||||
|
for (key, value) in db.iter() {
|
||||||
|
if let Some(field_value) = value.get(&field) {
|
||||||
|
index.entry(field_value.clone())
|
||||||
|
.or_insert_with(Vec::new)
|
||||||
|
.push(key.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
indexes.insert(field, index);
|
||||||
|
debug!("Index created");
|
||||||
|
Response::Success(None)
|
||||||
|
} else {
|
||||||
|
debug!("Index already exists");
|
||||||
|
Response::Error("Index already exists".to_string())
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::DropIndex { field } => {
|
||||||
|
debug!("Drop index on field: {}", field);
|
||||||
|
let mut indexes = indexes.write();
|
||||||
|
if indexes.remove(&field).is_some() {
|
||||||
|
debug!("Index dropped");
|
||||||
|
Response::Success(None)
|
||||||
|
} else {
|
||||||
|
debug!("Index not found");
|
||||||
|
Response::Error("Index not found".to_string())
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::SysExec { script_name } => {
|
||||||
|
debug!("Execute system script: {}", script_name);
|
||||||
|
match Self::execute_lua_script(&script_name) {
|
||||||
|
Ok(output) => {
|
||||||
|
debug!("Script executed successfully");
|
||||||
|
Response::Success(Some(output.into()))
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
debug!("Script execution failed: {}", e);
|
||||||
|
Response::Error(format!("Script execution failed: {}", e))
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Command::Replicate { .. } => {
|
||||||
|
debug!("Replication command processed");
|
||||||
|
Response::Success(None)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn update_indexes(
|
fn update_indexes(
|
||||||
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||||
key: String,
|
key: String,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user