Delete futriix/src/server.rs
This commit is contained in:
parent
b7d4d0272b
commit
d08b6b3020
@ -1,866 +0,0 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use crossbeam::queue::SegQueue;
|
||||
use parking_lot::RwLock;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::time;
|
||||
use log::{info, error, debug, warn};
|
||||
use warp::Filter;
|
||||
use rlua::Lua;
|
||||
|
||||
// Custom B*-tree implementation for time-series data
|
||||
mod bstar_tree {
|
||||
use serde::{Serialize, Deserialize};
|
||||
|
||||
const MIN_DEGREE: usize = 2;
|
||||
const MAX_KEYS: usize = 2 * MIN_DEGREE - 1;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct BStarTree<K: Ord + Clone, V: Clone> {
|
||||
root: Node<K, V>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
enum Node<K: Ord + Clone, V: Clone> {
|
||||
Internal {
|
||||
keys: Vec<K>,
|
||||
children: Vec<Node<K, V>>,
|
||||
},
|
||||
Leaf {
|
||||
keys: Vec<K>,
|
||||
values: Vec<V>,
|
||||
},
|
||||
}
|
||||
|
||||
impl<K: Ord + Clone, V: Clone> BStarTree<K, V> {
|
||||
pub fn new() -> Self {
|
||||
BStarTree {
|
||||
root: Node::Leaf {
|
||||
keys: Vec::new(),
|
||||
values: Vec::new(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, key: K, value: V) -> Option<V> {
|
||||
let (new_root, old_val) = self.root.insert(key, value);
|
||||
if let Some(new_root) = new_root {
|
||||
self.root = Node::Internal {
|
||||
keys: vec![new_root.0],
|
||||
children: vec![new_root.1, new_root.2],
|
||||
};
|
||||
}
|
||||
old_val
|
||||
}
|
||||
|
||||
pub fn range(&self, start: &K, end: &K) -> Vec<(&K, &V)> {
|
||||
self.root.range(start, end)
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, key: &K) -> Option<V> {
|
||||
let (new_root, old_val) = self.root.remove(key);
|
||||
if let Some(new_root) = new_root {
|
||||
self.root = new_root;
|
||||
}
|
||||
old_val
|
||||
}
|
||||
}
|
||||
|
||||
impl<K: Ord + Clone, V: Clone> Node<K, V> {
|
||||
fn insert(&mut self, key: K, value: V) -> (Option<(K, Node<K, V>, Node<K, V>)>, Option<V>) {
|
||||
match self {
|
||||
Node::Leaf { keys, values } => {
|
||||
match keys.binary_search(&key) {
|
||||
Ok(idx) => {
|
||||
let old_val = values[idx].clone();
|
||||
values[idx] = value;
|
||||
(None, Some(old_val))
|
||||
}
|
||||
Err(idx) => {
|
||||
keys.insert(idx, key);
|
||||
values.insert(idx, value);
|
||||
|
||||
if keys.len() > MAX_KEYS {
|
||||
let split_idx = keys.len() / 2;
|
||||
let split_key = keys[split_idx].clone();
|
||||
|
||||
let right_keys = keys.split_off(split_idx + 1);
|
||||
let right_values = values.split_off(split_idx + 1);
|
||||
|
||||
let right_node = Node::Leaf {
|
||||
keys: right_keys,
|
||||
values: right_values,
|
||||
};
|
||||
|
||||
let left_node = Node::Leaf {
|
||||
keys: keys.clone(),
|
||||
values: values.clone(),
|
||||
};
|
||||
|
||||
(Some((split_key, left_node, right_node)), None)
|
||||
} else {
|
||||
(None, None)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Node::Internal { keys, children } => {
|
||||
let idx = match keys.binary_search(&key) {
|
||||
Ok(idx) => idx + 1,
|
||||
Err(idx) => idx,
|
||||
};
|
||||
|
||||
let (split, old_val) = children[idx].insert(key, value);
|
||||
|
||||
if let Some((split_key, left, right)) = split {
|
||||
keys.insert(idx, split_key);
|
||||
children[idx] = left;
|
||||
children.insert(idx + 1, right);
|
||||
|
||||
if keys.len() > MAX_KEYS {
|
||||
let split_idx = keys.len() / 2;
|
||||
let split_key = keys[split_idx].clone();
|
||||
|
||||
let right_keys = keys.split_off(split_idx + 1);
|
||||
let right_children = children.split_off(split_idx + 1);
|
||||
|
||||
let right_node = Node::Internal {
|
||||
keys: right_keys,
|
||||
children: right_children,
|
||||
};
|
||||
|
||||
let left_node = Node::Internal {
|
||||
keys: keys.clone(),
|
||||
children: children.clone(),
|
||||
};
|
||||
|
||||
(Some((split_key, left_node, right_node)), old_val)
|
||||
} else {
|
||||
(None, old_val)
|
||||
}
|
||||
} else {
|
||||
(None, old_val)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn range(&self, start: &K, end: &K) -> Vec<(&K, &V)> {
|
||||
let mut result = Vec::new();
|
||||
self.collect_range(start, end, &mut result);
|
||||
result
|
||||
}
|
||||
|
||||
fn collect_range<'a>(&'a self, start: &K, end: &K, result: &mut Vec<(&'a K, &'a V)>) {
|
||||
match self {
|
||||
Node::Leaf { keys, values } => {
|
||||
let start_idx = match keys.binary_search(start) {
|
||||
Ok(idx) => idx,
|
||||
Err(idx) => idx,
|
||||
};
|
||||
|
||||
let end_idx = match keys.binary_search(end) {
|
||||
Ok(idx) => idx + 1,
|
||||
Err(idx) => idx,
|
||||
};
|
||||
|
||||
for i in start_idx..end_idx.min(keys.len()) {
|
||||
result.push((&keys[i], &values[i]));
|
||||
}
|
||||
}
|
||||
Node::Internal { keys, children } => {
|
||||
let mut idx = 0;
|
||||
while idx < keys.len() && keys[idx] < *start {
|
||||
idx += 1;
|
||||
}
|
||||
|
||||
if idx > 0 {
|
||||
children[idx - 1].collect_range(start, end, result);
|
||||
}
|
||||
|
||||
while idx < keys.len() && keys[idx] <= *end {
|
||||
children[idx].collect_range(start, end, result);
|
||||
idx += 1;
|
||||
}
|
||||
|
||||
if idx == keys.len() {
|
||||
children[idx].collect_range(start, end, result);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn remove(&mut self, key: &K) -> (Option<Node<K, V>>, Option<V>) {
|
||||
match self {
|
||||
Node::Leaf { keys, values } => {
|
||||
if let Ok(idx) = keys.binary_search(key) {
|
||||
let old_val = values.remove(idx);
|
||||
keys.remove(idx);
|
||||
|
||||
if keys.len() < MIN_DEGREE - 1 {
|
||||
(Some(Node::Leaf {
|
||||
keys: keys.clone(),
|
||||
values: values.clone(),
|
||||
}), Some(old_val))
|
||||
} else {
|
||||
(None, Some(old_val))
|
||||
}
|
||||
} else {
|
||||
(None, None)
|
||||
}
|
||||
}
|
||||
Node::Internal { keys, children } => {
|
||||
let idx = match keys.binary_search(key) {
|
||||
Ok(idx) => idx + 1,
|
||||
Err(idx) => idx,
|
||||
};
|
||||
|
||||
let (new_child, old_val) = children[idx].remove(key);
|
||||
|
||||
if let Some(new_child) = new_child {
|
||||
if let Node::Leaf { keys: new_keys, values: new_values } = new_child {
|
||||
if idx > 0 {
|
||||
// Merge with left sibling
|
||||
if let Node::Leaf { keys: left_keys, values: left_values } = &mut children[idx - 1] {
|
||||
left_keys.extend(new_keys);
|
||||
left_values.extend(new_values);
|
||||
keys.remove(idx - 1);
|
||||
children.remove(idx);
|
||||
}
|
||||
} else if idx + 1 < children.len() {
|
||||
// Merge with right sibling
|
||||
if let Node::Leaf { keys: right_keys, values: right_values } = &mut children[idx + 1] {
|
||||
right_keys.splice(0..0, new_keys);
|
||||
right_values.splice(0..0, new_values);
|
||||
keys.remove(idx);
|
||||
children.remove(idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if keys.len() < MIN_DEGREE - 1 {
|
||||
if keys.is_empty() {
|
||||
(Some(children.remove(0)), old_val)
|
||||
} else {
|
||||
(Some(Node::Internal {
|
||||
keys: keys.clone(),
|
||||
children: children.clone(),
|
||||
}), old_val)
|
||||
}
|
||||
} else {
|
||||
(None, old_val)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub enum Command {
|
||||
Insert { metric: String, timestamp: u128, value: serde_json::Value },
|
||||
GetRange { metric: String, start: u128, end: u128 },
|
||||
GetLatest { metric: String },
|
||||
DeleteRange { metric: String, start: u128, end: u128 },
|
||||
BeginTransaction,
|
||||
CommitTransaction(u64),
|
||||
RollbackTransaction(u64),
|
||||
CreateIndex { field: String },
|
||||
DropIndex { field: String },
|
||||
SysExec { script_name: String },
|
||||
Replicate { commands: Vec<(Command, u128)> },
|
||||
ReplicationEnable,
|
||||
ReplicationDisable,
|
||||
ReplicationAddPeer { addr: String },
|
||||
ReplicationRemovePeer { addr: String },
|
||||
ReplicationStatus,
|
||||
BackupCreate { path: String },
|
||||
BackupRestore { path: String },
|
||||
LuaExec { code: String },
|
||||
Halt,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum Response {
|
||||
Success(Option<serde_json::Value>),
|
||||
Error(String),
|
||||
ReplicationStatus {
|
||||
enabled: bool,
|
||||
peers: Vec<String>,
|
||||
last_sync: u128,
|
||||
},
|
||||
BackupStatus { success: bool, message: String },
|
||||
LuaOutput(String),
|
||||
}
|
||||
|
||||
pub struct FutriixServer {
|
||||
ts_db: Arc<RwLock<HashMap<String, bstar_tree::BStarTree<u128, serde_json::Value>>>>,
|
||||
command_queue: Arc<SegQueue<Command>>,
|
||||
transactions: Arc<RwLock<HashMap<u64, HashMap<String, Vec<(u128, serde_json::Value)>>>>>,
|
||||
indexes: Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||
replication_enabled: bool,
|
||||
peer_nodes: Vec<String>,
|
||||
sync_interval: u64,
|
||||
last_sync_timestamp: Arc<RwLock<u128>>,
|
||||
command_history: Arc<RwLock<Vec<(Command, u128)>>>,
|
||||
next_tx_id: Arc<RwLock<u64>>,
|
||||
shutdown: Arc<RwLock<bool>>,
|
||||
}
|
||||
|
||||
impl FutriixServer {
|
||||
pub fn new(config: &toml::Value) -> Self {
|
||||
debug!("Initializing server with config: {:?}", config);
|
||||
|
||||
let replication_enabled = config.get("replication")
|
||||
.and_then(|r| r.get("enabled"))
|
||||
.and_then(|v| v.as_bool())
|
||||
.unwrap_or(false);
|
||||
|
||||
let peer_nodes = config.get("replication")
|
||||
.and_then(|r| r.get("peer_nodes"))
|
||||
.and_then(|n| n.as_array())
|
||||
.map(|nodes| {
|
||||
nodes.iter()
|
||||
.filter_map(|n| n.as_str().map(|s| s.to_string()))
|
||||
.collect()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
let sync_interval = config.get("replication")
|
||||
.and_then(|r| r.get("sync_interval"))
|
||||
.and_then(|i| i.as_integer())
|
||||
.map(|i| i as u64)
|
||||
.unwrap_or(1000);
|
||||
|
||||
FutriixServer {
|
||||
ts_db: Arc::new(RwLock::new(HashMap::new())),
|
||||
command_queue: Arc::new(SegQueue::new()),
|
||||
transactions: Arc::new(RwLock::new(HashMap::new())),
|
||||
indexes: Arc::new(RwLock::new(HashMap::new())),
|
||||
replication_enabled,
|
||||
peer_nodes,
|
||||
sync_interval,
|
||||
last_sync_timestamp: Arc::new(RwLock::new(0)),
|
||||
command_history: Arc::new(RwLock::new(Vec::new())),
|
||||
next_tx_id: Arc::new(RwLock::new(1)),
|
||||
shutdown: Arc::new(RwLock::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(&self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
info!("Starting server on {}", addr);
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
|
||||
let http_port = addr.split(':').last().unwrap_or("8080").parse::<u16>().unwrap_or(8080) + 1;
|
||||
let http_addr = format!("0.0.0.0:{}", http_port);
|
||||
self.start_http_api(http_addr.clone());
|
||||
|
||||
info!("Futriix Server started successfully!");
|
||||
info!("Listening on: {}", addr);
|
||||
info!("HTTP API available on: {}", http_addr);
|
||||
|
||||
if self.replication_enabled {
|
||||
info!("Replication enabled with peers: {:?}", self.peer_nodes);
|
||||
let sync_task = self.start_replication_sync();
|
||||
tokio::spawn(sync_task);
|
||||
}
|
||||
|
||||
let scripts_dir = Path::new("scripts");
|
||||
if !scripts_dir.exists() {
|
||||
if let Err(e) = std::fs::create_dir(scripts_dir) {
|
||||
warn!("Failed to create scripts directory: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
let backups_dir = Path::new("backups");
|
||||
if !backups_dir.exists() {
|
||||
if let Err(e) = std::fs::create_dir(backups_dir) {
|
||||
warn!("Failed to create backups directory: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
if *self.shutdown.read() {
|
||||
info!("Shutdown signal received, stopping server");
|
||||
break;
|
||||
}
|
||||
|
||||
match listener.accept().await {
|
||||
Ok((socket, _)) => {
|
||||
let server = self.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = server.handle_connection(socket).await {
|
||||
error!("Connection error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Accept error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn start_http_api(&self, addr: String) {
|
||||
let ts_db = self.ts_db.clone();
|
||||
let command_queue_insert = self.command_queue.clone();
|
||||
let _command_queue_get = self.command_queue.clone();
|
||||
let command_queue_delete = self.command_queue.clone();
|
||||
|
||||
let ts_db_clone = ts_db.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let get = warp::path!("api" / "get" / String / u128 / u128)
|
||||
.and(warp::get())
|
||||
.map(move |metric: String, start: u128, end: u128| {
|
||||
let db = ts_db.read();
|
||||
match db.get(&metric) {
|
||||
Some(tree) => {
|
||||
let data = tree.range(&start, &end);
|
||||
warp::reply::json(&data)
|
||||
},
|
||||
None => warp::reply::json(&Vec::<(u128, serde_json::Value)>::new()),
|
||||
}
|
||||
});
|
||||
|
||||
let get_latest = warp::path!("api" / "latest" / String)
|
||||
.and(warp::get())
|
||||
.map(move |metric: String| {
|
||||
let db = ts_db_clone.read();
|
||||
match db.get(&metric) {
|
||||
Some(tree) => {
|
||||
if let Some((ts, val)) = tree.range(&0, &u128::MAX).last() {
|
||||
warp::reply::json(&(*ts, (*val).clone()))
|
||||
} else {
|
||||
warp::reply::json(&(0, serde_json::Value::Null))
|
||||
}
|
||||
},
|
||||
None => warp::reply::json(&(0, serde_json::Value::Null)),
|
||||
}
|
||||
});
|
||||
|
||||
let insert = warp::path!("api" / "insert")
|
||||
.and(warp::post())
|
||||
.and(warp::body::json())
|
||||
.map(move |data: HashMap<String, (u128, serde_json::Value)>| {
|
||||
for (metric, (timestamp, value)) in data {
|
||||
command_queue_insert.push(Command::Insert {
|
||||
metric,
|
||||
timestamp,
|
||||
value
|
||||
});
|
||||
}
|
||||
warp::reply::json(&serde_json::json!({"status": "ok"}))
|
||||
});
|
||||
|
||||
let delete = warp::path!("api" / "delete" / String / u128 / u128)
|
||||
.and(warp::delete())
|
||||
.map(move |metric: String, start: u128, end: u128| {
|
||||
command_queue_delete.push(Command::DeleteRange {
|
||||
metric,
|
||||
start,
|
||||
end
|
||||
});
|
||||
warp::reply::json(&serde_json::json!({"status": "ok"}))
|
||||
});
|
||||
|
||||
let routes = get.or(get_latest).or(insert).or(delete);
|
||||
|
||||
warp::serve(routes)
|
||||
.run(addr.parse::<std::net::SocketAddr>().unwrap())
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
fn start_replication_sync(&self) -> impl std::future::Future<Output = ()> + 'static {
|
||||
let peer_nodes = self.peer_nodes.clone();
|
||||
let command_history = self.command_history.clone();
|
||||
let last_sync_timestamp = self.last_sync_timestamp.clone();
|
||||
let sync_interval = self.sync_interval;
|
||||
|
||||
async move {
|
||||
let mut interval = time::interval(Duration::from_millis(sync_interval));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
let commands_to_sync = {
|
||||
let history = command_history.read();
|
||||
let last_sync = *last_sync_timestamp.read();
|
||||
history.iter()
|
||||
.filter(|(_, ts)| *ts > last_sync)
|
||||
.map(|(cmd, ts)| (cmd.clone(), *ts))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
if !commands_to_sync.is_empty() {
|
||||
for peer in &peer_nodes {
|
||||
if let Ok(mut stream) = TcpStream::connect(peer).await {
|
||||
let replicate_cmd = Command::Replicate {
|
||||
commands: commands_to_sync.clone(),
|
||||
};
|
||||
|
||||
if let Ok(cmd_bytes) = rmp_serde::to_vec(&replicate_cmd) {
|
||||
let len = cmd_bytes.len() as u32;
|
||||
if let Err(e) = stream.write_all(&len.to_be_bytes()).await {
|
||||
warn!("Failed to write command length to peer {}: {}", peer, e);
|
||||
continue;
|
||||
}
|
||||
if let Err(e) = stream.write_all(&cmd_bytes).await {
|
||||
warn!("Failed to write command to peer {}: {}", peer, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_connection(
|
||||
&self,
|
||||
mut socket: TcpStream,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
loop {
|
||||
let mut len_buf = [0u8; 4];
|
||||
if let Err(e) = socket.read_exact(&mut len_buf).await {
|
||||
error!("Failed to read command length: {}", e);
|
||||
break;
|
||||
}
|
||||
let len = u32::from_be_bytes(len_buf) as usize;
|
||||
|
||||
let mut buf = vec![0u8; len];
|
||||
if let Err(e) = socket.read_exact(&mut buf).await {
|
||||
error!("Failed to read command: {}", e);
|
||||
break;
|
||||
}
|
||||
|
||||
let cmd: Command = match rmp_serde::from_slice(&buf) {
|
||||
Ok(cmd) => cmd,
|
||||
Err(e) => {
|
||||
error!("Failed to deserialize command: {}", e);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
self.command_queue.push(cmd.clone());
|
||||
|
||||
while let Some(cmd) = self.command_queue.pop() {
|
||||
if let Command::Replicate { commands } = cmd {
|
||||
for (cmd, ts) in commands {
|
||||
match cmd {
|
||||
Command::Insert { metric, timestamp, value } => {
|
||||
let mut db = self.ts_db.write();
|
||||
let tree = db.entry(metric)
|
||||
.or_insert_with(bstar_tree::BStarTree::new);
|
||||
tree.insert(timestamp, value);
|
||||
}
|
||||
Command::DeleteRange { metric, start, end } => {
|
||||
let mut db = self.ts_db.write();
|
||||
if let Some(tree) = db.get_mut(&metric) {
|
||||
for ts in start..=end {
|
||||
tree.remove(&ts);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
let _ = Self::process_command(
|
||||
&self.ts_db,
|
||||
&self.transactions,
|
||||
&self.indexes,
|
||||
&self.next_tx_id,
|
||||
&self.shutdown,
|
||||
cmd,
|
||||
);
|
||||
}
|
||||
}
|
||||
*self.last_sync_timestamp.write() = ts;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let response = Self::process_command(
|
||||
&self.ts_db,
|
||||
&self.transactions,
|
||||
&self.indexes,
|
||||
&self.next_tx_id,
|
||||
&self.shutdown,
|
||||
cmd.clone(),
|
||||
);
|
||||
|
||||
let response_bytes = match rmp_serde::to_vec(&response) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(e) => {
|
||||
error!("Failed to serialize response: {}", e);
|
||||
break;
|
||||
}
|
||||
};
|
||||
let len = response_bytes.len() as u32;
|
||||
if let Err(e) = socket.write_all(&len.to_be_bytes()).await {
|
||||
error!("Failed to write response length: {}", e);
|
||||
break;
|
||||
}
|
||||
if let Err(e) = socket.write_all(&response_bytes).await {
|
||||
error!("Failed to write response: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_command(
|
||||
ts_db: &Arc<RwLock<HashMap<String, bstar_tree::BStarTree<u128, serde_json::Value>>>>,
|
||||
transactions: &Arc<RwLock<HashMap<u64, HashMap<String, Vec<(u128, serde_json::Value)>>>>>,
|
||||
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||
next_tx_id: &Arc<RwLock<u64>>,
|
||||
shutdown: &Arc<RwLock<bool>>,
|
||||
cmd: Command,
|
||||
) -> Response {
|
||||
match cmd {
|
||||
Command::Insert { metric, timestamp, value } => {
|
||||
let mut db = ts_db.write();
|
||||
let tree = db.entry(metric.clone())
|
||||
.or_insert_with(bstar_tree::BStarTree::new);
|
||||
tree.insert(timestamp, value.clone());
|
||||
|
||||
Self::update_indexes(indexes, metric, &value);
|
||||
Response::Success(None)
|
||||
},
|
||||
Command::GetRange { metric, start, end } => {
|
||||
let db = ts_db.read();
|
||||
match db.get(&metric) {
|
||||
Some(tree) => {
|
||||
let data = tree.range(&start, &end);
|
||||
Response::Success(Some(serde_json::json!(data)))
|
||||
},
|
||||
None => Response::Error("Metric not found".to_string()),
|
||||
}
|
||||
},
|
||||
Command::GetLatest { metric } => {
|
||||
let db = ts_db.read();
|
||||
match db.get(&metric) {
|
||||
Some(tree) => {
|
||||
if let Some((ts, val)) = tree.range(&0, &u128::MAX).last() {
|
||||
Response::Success(Some(serde_json::json!({
|
||||
"timestamp": ts,
|
||||
"value": val
|
||||
})))
|
||||
} else {
|
||||
Response::Error("No data for metric".to_string())
|
||||
}
|
||||
},
|
||||
None => Response::Error("Metric not found".to_string()),
|
||||
}
|
||||
},
|
||||
Command::DeleteRange { metric, start, end } => {
|
||||
let mut db = ts_db.write();
|
||||
if let Some(tree) = db.get_mut(&metric) {
|
||||
for ts in start..=end {
|
||||
tree.remove(&ts);
|
||||
}
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("Metric not found".to_string())
|
||||
}
|
||||
},
|
||||
Command::BeginTransaction => {
|
||||
let tx_id = {
|
||||
let mut next_id = next_tx_id.write();
|
||||
let id = *next_id;
|
||||
*next_id += 1;
|
||||
id
|
||||
};
|
||||
|
||||
let mut transactions = transactions.write();
|
||||
transactions.insert(tx_id, HashMap::new());
|
||||
Response::Success(Some(serde_json::json!(tx_id)))
|
||||
},
|
||||
Command::CommitTransaction(tx_id) => {
|
||||
let mut db = ts_db.write();
|
||||
let mut tx_map = transactions.write();
|
||||
|
||||
if let Some(tx_data) = tx_map.remove(&tx_id) {
|
||||
for (metric, data_points) in tx_data {
|
||||
let tree = db.entry(metric.clone())
|
||||
.or_insert_with(bstar_tree::BStarTree::new);
|
||||
for (ts, val) in data_points {
|
||||
tree.insert(ts, val.clone());
|
||||
Self::update_indexes(indexes, metric.clone(), &val);
|
||||
}
|
||||
}
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("Transaction not found".to_string())
|
||||
}
|
||||
},
|
||||
Command::RollbackTransaction(tx_id) => {
|
||||
let mut tx_map = transactions.write();
|
||||
if tx_map.remove(&tx_id).is_some() {
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("Transaction not found".to_string())
|
||||
}
|
||||
},
|
||||
Command::CreateIndex { field } => {
|
||||
let mut idx = indexes.write();
|
||||
if !idx.contains_key(&field) {
|
||||
idx.insert(field, HashMap::new());
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("Index already exists".to_string())
|
||||
}
|
||||
},
|
||||
Command::DropIndex { field } => {
|
||||
let mut idx = indexes.write();
|
||||
if idx.remove(&field).is_some() {
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("Index not found".to_string())
|
||||
}
|
||||
},
|
||||
Command::SysExec { script_name } => {
|
||||
match Self::execute_lua_script(&script_name) {
|
||||
Ok(output) => Response::Success(Some(serde_json::json!(output))),
|
||||
Err(e) => Response::Error(format!("Script execution failed: {}", e)),
|
||||
}
|
||||
},
|
||||
Command::LuaExec { code } => {
|
||||
match Self::execute_lua_code(&code) {
|
||||
Ok(output) => Response::LuaOutput(output),
|
||||
Err(e) => Response::Error(format!("Lua execution failed: {}", e)),
|
||||
}
|
||||
},
|
||||
Command::ReplicationEnable => Response::Success(None),
|
||||
Command::ReplicationDisable => Response::Success(None),
|
||||
Command::ReplicationAddPeer { addr: _ } => Response::Success(None),
|
||||
Command::ReplicationRemovePeer { addr: _ } => Response::Success(None),
|
||||
Command::ReplicationStatus => Response::Success(None),
|
||||
Command::Replicate { .. } => Response::Success(None),
|
||||
Command::BackupCreate { path } => {
|
||||
match Self::create_backup(ts_db, &path) {
|
||||
Ok(_) => Response::BackupStatus {
|
||||
success: true,
|
||||
message: format!("Backup created successfully at {}", path),
|
||||
},
|
||||
Err(e) => Response::BackupStatus {
|
||||
success: false,
|
||||
message: format!("Backup failed: {}", e),
|
||||
},
|
||||
}
|
||||
},
|
||||
Command::BackupRestore { path } => {
|
||||
match Self::restore_backup(ts_db, indexes, &path) {
|
||||
Ok(_) => Response::BackupStatus {
|
||||
success: true,
|
||||
message: format!("Backup restored successfully from {}", path),
|
||||
},
|
||||
Err(e) => Response::BackupStatus {
|
||||
success: false,
|
||||
message: format!("Restore failed: {}", e),
|
||||
},
|
||||
}
|
||||
},
|
||||
Command::Halt => {
|
||||
*shutdown.write() = true;
|
||||
Response::Success(None)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn execute_lua_code(code: &str) -> Result<String, Box<dyn std::error::Error>> {
|
||||
let lua = Lua::new();
|
||||
let result = lua.load(code).eval::<rlua::String>()?;
|
||||
Ok(result.to_str()?.to_string())
|
||||
}
|
||||
|
||||
fn create_backup(
|
||||
ts_db: &Arc<RwLock<HashMap<String, bstar_tree::BStarTree<u128, serde_json::Value>>>>,
|
||||
path: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let db = ts_db.read();
|
||||
let serialized = serde_json::to_string(&*db)?;
|
||||
std::fs::write(path, serialized)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn restore_backup(
|
||||
ts_db: &Arc<RwLock<HashMap<String, bstar_tree::BStarTree<u128, serde_json::Value>>>>,
|
||||
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||
path: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let data = std::fs::read_to_string(path)?;
|
||||
let restored: HashMap<String, bstar_tree::BStarTree<u128, serde_json::Value>> = serde_json::from_str(&data)?;
|
||||
|
||||
let mut db = ts_db.write();
|
||||
*db = restored;
|
||||
|
||||
let mut idx = indexes.write();
|
||||
idx.clear();
|
||||
|
||||
for (metric, tree) in db.iter() {
|
||||
for (_, val) in tree.range(&0, &u128::MAX) {
|
||||
for (field, index) in idx.iter_mut() {
|
||||
if let Some(field_value) = val.get(field) {
|
||||
index.entry(field_value.clone())
|
||||
.or_default()
|
||||
.push(metric.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update_indexes(
|
||||
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||
metric: String,
|
||||
value: &serde_json::Value,
|
||||
) {
|
||||
let mut idx = indexes.write();
|
||||
for (field, index) in idx.iter_mut() {
|
||||
if let Some(field_value) = value.get(field) {
|
||||
index.entry(field_value.clone())
|
||||
.or_default()
|
||||
.push(metric.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn execute_lua_script(script_name: &str) -> Result<String, Box<dyn std::error::Error>> {
|
||||
let script_path = Path::new("scripts").join(script_name).with_extension("lua");
|
||||
let output = std::process::Command::new("lua")
|
||||
.arg(&script_path)
|
||||
.output()?;
|
||||
|
||||
if output.status.success() {
|
||||
Ok(String::from_utf8(output.stdout)?)
|
||||
} else {
|
||||
Err(String::from_utf8(output.stderr)?.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for FutriixServer {
|
||||
fn clone(&self) -> Self {
|
||||
FutriixServer {
|
||||
ts_db: self.ts_db.clone(),
|
||||
command_queue: self.command_queue.clone(),
|
||||
transactions: self.transactions.clone(),
|
||||
indexes: self.indexes.clone(),
|
||||
replication_enabled: self.replication_enabled,
|
||||
peer_nodes: self.peer_nodes.clone(),
|
||||
sync_interval: self.sync_interval,
|
||||
last_sync_timestamp: self.last_sync_timestamp.clone(),
|
||||
command_history: self.command_history.clone(),
|
||||
next_tx_id: self.next_tx_id.clone(),
|
||||
shutdown: self.shutdown.clone(),
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user