Delete futriix-server/src/old-server.rs
This commit is contained in:
parent
31f412934d
commit
4955d8c4d0
@ -1,376 +0,0 @@
|
|||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::path::Path;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::time::Duration;
|
|
||||||
use crossbeam::queue::SegQueue;
|
|
||||||
use parking_lot::RwLock;
|
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
||||||
use tokio::time;
|
|
||||||
use log::{info, error, debug};
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
|
||||||
pub enum Command {
|
|
||||||
Insert { key: String, value: serde_json::Value },
|
|
||||||
Get { key: String },
|
|
||||||
Update { key: String, value: serde_json::Value },
|
|
||||||
Delete { key: String },
|
|
||||||
BeginTransaction,
|
|
||||||
CommitTransaction,
|
|
||||||
RollbackTransaction,
|
|
||||||
CreateIndex { field: String },
|
|
||||||
DropIndex { field: String },
|
|
||||||
SysExec { script_name: String },
|
|
||||||
Replicate { commands: Vec<(Command, u128)> },
|
|
||||||
ReplicationEnable,
|
|
||||||
ReplicationDisable,
|
|
||||||
ReplicationAddPeer { addr: String },
|
|
||||||
ReplicationRemovePeer { addr: String },
|
|
||||||
ReplicationStatus,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
pub enum Response {
|
|
||||||
Success(Option<serde_json::Value>),
|
|
||||||
Error(String),
|
|
||||||
ReplicationStatus {
|
|
||||||
enabled: bool,
|
|
||||||
peers: Vec<String>,
|
|
||||||
last_sync: u128,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct FutriixServer {
|
|
||||||
db: Arc<RwLock<HashMap<String, serde_json::Value>>>,
|
|
||||||
command_queue: Arc<SegQueue<Command>>,
|
|
||||||
transactions: Arc<RwLock<Vec<HashMap<String, serde_json::Value>>>>,
|
|
||||||
indexes: Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
|
||||||
replication_enabled: bool,
|
|
||||||
peer_nodes: Vec<String>,
|
|
||||||
sync_interval: u64,
|
|
||||||
last_sync_timestamp: Arc<RwLock<u128>>,
|
|
||||||
command_history: Arc<RwLock<Vec<(Command, u128)>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FutriixServer {
|
|
||||||
pub fn new(config: &toml::Value) -> Self {
|
|
||||||
debug!("Initializing server with config: {:?}", config);
|
|
||||||
|
|
||||||
let replication_enabled = config.get("replication")
|
|
||||||
.and_then(|r| r.get("enabled"))
|
|
||||||
.and_then(|v| v.as_bool())
|
|
||||||
.unwrap_or(false);
|
|
||||||
|
|
||||||
let peer_nodes = config.get("replication")
|
|
||||||
.and_then(|r| r.get("peer_nodes"))
|
|
||||||
.and_then(|n| n.as_array())
|
|
||||||
.map(|nodes| {
|
|
||||||
nodes.iter()
|
|
||||||
.filter_map(|n| n.as_str().map(|s| s.to_string()))
|
|
||||||
.collect()
|
|
||||||
})
|
|
||||||
.unwrap_or_else(Vec::new);
|
|
||||||
|
|
||||||
let sync_interval = config.get("replication")
|
|
||||||
.and_then(|r| r.get("sync_interval"))
|
|
||||||
.and_then(|i| i.as_integer())
|
|
||||||
.map(|i| i as u64)
|
|
||||||
.unwrap_or(1000);
|
|
||||||
|
|
||||||
FutriixServer {
|
|
||||||
db: Arc::new(RwLock::new(HashMap::new())),
|
|
||||||
command_queue: Arc::new(SegQueue::new()),
|
|
||||||
transactions: Arc::new(RwLock::new(Vec::new())),
|
|
||||||
indexes: Arc::new(RwLock::new(HashMap::new())),
|
|
||||||
replication_enabled,
|
|
||||||
peer_nodes,
|
|
||||||
sync_interval,
|
|
||||||
last_sync_timestamp: Arc::new(RwLock::new(0)),
|
|
||||||
command_history: Arc::new(RwLock::new(Vec::new())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run(&self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
info!("Starting server on {}", addr);
|
|
||||||
let listener = TcpListener::bind(addr).await?;
|
|
||||||
|
|
||||||
info!("Futriix Server started successfully!");
|
|
||||||
info!("Listening on: {}", addr);
|
|
||||||
|
|
||||||
if self.replication_enabled {
|
|
||||||
info!("Replication enabled with peers: {:?}", self.peer_nodes);
|
|
||||||
let sync_task = self.start_replication_sync();
|
|
||||||
tokio::spawn(sync_task);
|
|
||||||
}
|
|
||||||
|
|
||||||
let scripts_dir = Path::new("scripts");
|
|
||||||
if !scripts_dir.exists() {
|
|
||||||
std::fs::create_dir(scripts_dir)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let (socket, _) = listener.accept().await?;
|
|
||||||
let server = self.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
server.handle_connection(socket).await.unwrap_or_else(|e| {
|
|
||||||
error!("Connection error: {}", e);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn start_replication_sync(&self) -> impl std::future::Future<Output = ()> + 'static {
|
|
||||||
let peer_nodes = self.peer_nodes.clone();
|
|
||||||
let command_history = self.command_history.clone();
|
|
||||||
let last_sync_timestamp = self.last_sync_timestamp.clone();
|
|
||||||
let sync_interval = self.sync_interval;
|
|
||||||
|
|
||||||
async move {
|
|
||||||
let mut interval = time::interval(Duration::from_millis(sync_interval));
|
|
||||||
loop {
|
|
||||||
interval.tick().await;
|
|
||||||
|
|
||||||
let commands_to_sync = {
|
|
||||||
let history = command_history.read();
|
|
||||||
let last_sync = *last_sync_timestamp.read();
|
|
||||||
history.iter()
|
|
||||||
.filter(|(_, ts)| *ts > last_sync)
|
|
||||||
.map(|(cmd, ts)| (cmd.clone(), *ts))
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
};
|
|
||||||
|
|
||||||
if !commands_to_sync.is_empty() {
|
|
||||||
for peer in &peer_nodes {
|
|
||||||
if let Ok(mut stream) = TcpStream::connect(peer).await {
|
|
||||||
let replicate_cmd = Command::Replicate {
|
|
||||||
commands: commands_to_sync.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Ok(cmd_bytes) = rmp_serde::to_vec(&replicate_cmd) {
|
|
||||||
let len = cmd_bytes.len() as u32;
|
|
||||||
if stream.write_all(&len.to_be_bytes()).await.is_ok() {
|
|
||||||
let _ = stream.write_all(&cmd_bytes).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_connection(
|
|
||||||
&self,
|
|
||||||
mut socket: TcpStream,
|
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
loop {
|
|
||||||
let mut len_buf = [0u8; 4];
|
|
||||||
socket.read_exact(&mut len_buf).await?;
|
|
||||||
let len = u32::from_be_bytes(len_buf) as usize;
|
|
||||||
|
|
||||||
let mut buf = vec![0u8; len];
|
|
||||||
socket.read_exact(&mut buf).await?;
|
|
||||||
|
|
||||||
let cmd: Command = rmp_serde::from_slice(&buf)?;
|
|
||||||
|
|
||||||
self.command_queue.push(cmd.clone());
|
|
||||||
|
|
||||||
while let Some(cmd) = self.command_queue.pop() {
|
|
||||||
if let Command::Replicate { commands } = cmd {
|
|
||||||
for (cmd, ts) in commands {
|
|
||||||
match cmd {
|
|
||||||
Command::Insert { key, value } | Command::Update { key, value } => {
|
|
||||||
let mut db = self.db.write();
|
|
||||||
if let Some(existing) = db.get(&key) {
|
|
||||||
let resolved = self.resolve_conflict(&key, existing, &value);
|
|
||||||
db.insert(key, resolved);
|
|
||||||
} else {
|
|
||||||
db.insert(key, value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
let _ = Self::process_command(
|
|
||||||
&self.db,
|
|
||||||
&self.transactions,
|
|
||||||
&self.indexes,
|
|
||||||
cmd,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*self.last_sync_timestamp.write() = ts;
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let response = Self::process_command(
|
|
||||||
&self.db,
|
|
||||||
&self.transactions,
|
|
||||||
&self.indexes,
|
|
||||||
cmd.clone(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let response_bytes = rmp_serde::to_vec(&response)?;
|
|
||||||
let len = response_bytes.len() as u32;
|
|
||||||
socket.write_all(&len.to_be_bytes()).await?;
|
|
||||||
socket.write_all(&response_bytes).await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn resolve_conflict(
|
|
||||||
&self,
|
|
||||||
_key: &str,
|
|
||||||
local_value: &serde_json::Value,
|
|
||||||
remote_value: &serde_json::Value,
|
|
||||||
) -> serde_json::Value {
|
|
||||||
let local_ts = local_value.get("_timestamp").and_then(|v| v.as_u64()).unwrap_or(0);
|
|
||||||
let remote_ts = remote_value.get("_timestamp").and_then(|v| v.as_u64()).unwrap_or(0);
|
|
||||||
|
|
||||||
if remote_ts > local_ts {
|
|
||||||
remote_value.clone()
|
|
||||||
} else {
|
|
||||||
local_value.clone()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn process_command(
|
|
||||||
db: &Arc<RwLock<HashMap<String, serde_json::Value>>>,
|
|
||||||
_transactions: &Arc<RwLock<Vec<HashMap<String, serde_json::Value>>>>,
|
|
||||||
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
|
||||||
cmd: Command,
|
|
||||||
) -> Response {
|
|
||||||
match cmd {
|
|
||||||
Command::Insert { key, value } => {
|
|
||||||
let mut db = db.write();
|
|
||||||
db.insert(key.clone(), value.clone());
|
|
||||||
Self::update_indexes(indexes, key, &value);
|
|
||||||
Response::Success(None)
|
|
||||||
},
|
|
||||||
Command::Get { key } => {
|
|
||||||
let db = db.read();
|
|
||||||
match db.get(&key) {
|
|
||||||
Some(value) => Response::Success(Some(value.clone())),
|
|
||||||
None => Response::Error("Key not found".to_string()),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::Update { key, value } => {
|
|
||||||
let mut db = db.write();
|
|
||||||
if db.contains_key(&key) {
|
|
||||||
db.insert(key.clone(), value.clone());
|
|
||||||
Self::update_indexes(indexes, key, &value);
|
|
||||||
Response::Success(None)
|
|
||||||
} else {
|
|
||||||
Response::Error("Key not found".to_string())
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::Delete { key } => {
|
|
||||||
let mut db = db.write();
|
|
||||||
if db.remove(&key).is_some() {
|
|
||||||
Self::remove_from_indexes(indexes, key);
|
|
||||||
Response::Success(None)
|
|
||||||
} else {
|
|
||||||
Response::Error("Key not found".to_string())
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::BeginTransaction => {
|
|
||||||
Response::Success(None)
|
|
||||||
},
|
|
||||||
Command::CommitTransaction => {
|
|
||||||
Response::Success(None)
|
|
||||||
},
|
|
||||||
Command::RollbackTransaction => {
|
|
||||||
Response::Success(None)
|
|
||||||
},
|
|
||||||
Command::CreateIndex { field } => {
|
|
||||||
let mut indexes = indexes.write();
|
|
||||||
if !indexes.contains_key(&field) {
|
|
||||||
indexes.insert(field, HashMap::new());
|
|
||||||
Response::Success(None)
|
|
||||||
} else {
|
|
||||||
Response::Error("Index already exists".to_string())
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::DropIndex { field } => {
|
|
||||||
let mut indexes = indexes.write();
|
|
||||||
if indexes.remove(&field).is_some() {
|
|
||||||
Response::Success(None)
|
|
||||||
} else {
|
|
||||||
Response::Error("Index not found".to_string())
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::SysExec { script_name } => {
|
|
||||||
#[allow(unused_variables)]
|
|
||||||
match Self::execute_lua_script(&script_name) {
|
|
||||||
Ok(output) => Response::Success(Some(output.into())),
|
|
||||||
Err(e) => Response::Error(format!("Script execution failed: {}", e)),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::ReplicationEnable => Response::Success(None),
|
|
||||||
Command::ReplicationDisable => Response::Success(None),
|
|
||||||
Command::ReplicationAddPeer { addr: _ } => Response::Success(None),
|
|
||||||
Command::ReplicationRemovePeer { addr: _ } => Response::Success(None),
|
|
||||||
Command::ReplicationStatus => Response::Success(None),
|
|
||||||
Command::Replicate { .. } => Response::Success(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn update_indexes(
|
|
||||||
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
|
||||||
key: String,
|
|
||||||
value: &serde_json::Value,
|
|
||||||
) {
|
|
||||||
let mut indexes = indexes.write();
|
|
||||||
for (field, index) in indexes.iter_mut() {
|
|
||||||
if let Some(field_value) = value.get(field) {
|
|
||||||
index.entry(field_value.clone())
|
|
||||||
.or_insert_with(Vec::new)
|
|
||||||
.push(key.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove_from_indexes(
|
|
||||||
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
|
||||||
key: String,
|
|
||||||
) {
|
|
||||||
let mut indexes = indexes.write();
|
|
||||||
for index in indexes.values_mut() {
|
|
||||||
for entries in index.values_mut() {
|
|
||||||
if let Some(pos) = entries.iter().position(|k| k == &key) {
|
|
||||||
entries.remove(pos);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(dead_code)]
|
|
||||||
fn execute_lua_script(script_name: &str) -> Result<String, Box<dyn std::error::Error>> {
|
|
||||||
let script_path = Path::new("scripts").join(script_name).with_extension("lua");
|
|
||||||
let output = std::process::Command::new("lua")
|
|
||||||
.arg(&script_path)
|
|
||||||
.output()?;
|
|
||||||
|
|
||||||
if output.status.success() {
|
|
||||||
Ok(String::from_utf8(output.stdout)?)
|
|
||||||
} else {
|
|
||||||
Err(String::from_utf8(output.stderr)?.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Clone for FutriixServer {
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
FutriixServer {
|
|
||||||
db: self.db.clone(),
|
|
||||||
command_queue: self.command_queue.clone(),
|
|
||||||
transactions: self.transactions.clone(),
|
|
||||||
indexes: self.indexes.clone(),
|
|
||||||
replication_enabled: self.replication_enabled,
|
|
||||||
peer_nodes: self.peer_nodes.clone(),
|
|
||||||
sync_interval: self.sync_interval,
|
|
||||||
last_sync_timestamp: self.last_sync_timestamp.clone(),
|
|
||||||
command_history: self.command_history.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user