Upload files to "futriix-server/src"
This commit is contained in:
parent
0579a51262
commit
628836f6b9
268
futriix-server/src/server.rs
Normal file
268
futriix-server/src/server.rs
Normal file
@ -0,0 +1,268 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use parking_lot::RwLock;
|
||||
use crossbeam::queue::SegQueue;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use log::{info, error};
|
||||
use colored::Colorize;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum Command {
|
||||
Insert { key: String, value: serde_json::Value },
|
||||
Get { key: String },
|
||||
Update { key: String, value: serde_json::Value },
|
||||
Delete { key: String },
|
||||
BeginTransaction,
|
||||
CommitTransaction,
|
||||
RollbackTransaction,
|
||||
CreateIndex { field: String },
|
||||
DropIndex { field: String },
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum Response {
|
||||
Success(Option<serde_json::Value>),
|
||||
Error(String),
|
||||
}
|
||||
|
||||
pub struct FutriixServer {
|
||||
db: Arc<RwLock<HashMap<String, serde_json::Value>>>,
|
||||
command_queue: Arc<SegQueue<Command>>,
|
||||
transactions: Arc<RwLock<Vec<HashMap<String, serde_json::Value>>>>,
|
||||
indexes: Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||
}
|
||||
|
||||
impl FutriixServer {
|
||||
pub fn new() -> Self {
|
||||
FutriixServer {
|
||||
db: Arc::new(RwLock::new(HashMap::new())),
|
||||
command_queue: Arc::new(SegQueue::new()),
|
||||
transactions: Arc::new(RwLock::new(Vec::new())),
|
||||
indexes: Arc::new(RwLock::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(&self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
|
||||
println!();
|
||||
info!("{}", "Futriix Server started successfully!".bright_cyan());
|
||||
info!("Listening on: {}", addr);
|
||||
info!("Log file: futriix.log");
|
||||
|
||||
loop {
|
||||
let (socket, _) = listener.accept().await?;
|
||||
let db = self.db.clone();
|
||||
let queue = self.command_queue.clone();
|
||||
let transactions = self.transactions.clone();
|
||||
let indexes = self.indexes.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = Self::handle_connection(socket, db, queue, transactions, indexes).await {
|
||||
error!("Connection error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_connection(
|
||||
mut socket: TcpStream,
|
||||
db: Arc<RwLock<HashMap<String, serde_json::Value>>>,
|
||||
queue: Arc<SegQueue<Command>>,
|
||||
transactions: Arc<RwLock<Vec<HashMap<String, serde_json::Value>>>>,
|
||||
indexes: Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
loop {
|
||||
let mut len_buf = [0u8; 4];
|
||||
if let Err(e) = socket.read_exact(&mut len_buf).await {
|
||||
if e.kind() == std::io::ErrorKind::UnexpectedEof {
|
||||
break;
|
||||
}
|
||||
return Err(e.into());
|
||||
}
|
||||
let len = u32::from_be_bytes(len_buf) as usize;
|
||||
|
||||
let mut buf = vec![0u8; len];
|
||||
socket.read_exact(&mut buf).await?;
|
||||
|
||||
let cmd: Command = rmp_serde::from_slice(&buf)?;
|
||||
queue.push(cmd);
|
||||
|
||||
while let Some(cmd) = queue.pop() {
|
||||
let response = match cmd {
|
||||
Command::Insert { key, value } => {
|
||||
let mut tx_data = None;
|
||||
{
|
||||
let mut transactions = transactions.write();
|
||||
if let Some(tx) = transactions.last_mut() {
|
||||
tx.insert(key.clone(), value.clone());
|
||||
tx_data = Some((key.clone(), value.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
if tx_data.is_none() {
|
||||
let mut db = db.write();
|
||||
db.insert(key.clone(), value.clone());
|
||||
Self::update_indexes(&indexes, &key, &value);
|
||||
}
|
||||
Response::Success(None)
|
||||
},
|
||||
Command::Get { key } => {
|
||||
let db = db.read();
|
||||
match db.get(&key) {
|
||||
Some(value) => Response::Success(Some(value.clone())),
|
||||
None => Response::Error("Key not found".to_string()),
|
||||
}
|
||||
},
|
||||
Command::Update { key, value } => {
|
||||
let mut tx_data = None;
|
||||
{
|
||||
let mut transactions = transactions.write();
|
||||
if let Some(tx) = transactions.last_mut() {
|
||||
tx.insert(key.clone(), value.clone());
|
||||
tx_data = Some((key.clone(), value.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
if tx_data.is_none() {
|
||||
let mut db = db.write();
|
||||
if db.contains_key(&key) {
|
||||
db.insert(key.clone(), value.clone());
|
||||
Self::update_indexes(&indexes, &key, &value);
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("Key not found".to_string())
|
||||
}
|
||||
} else {
|
||||
Response::Success(None)
|
||||
}
|
||||
},
|
||||
Command::Delete { key } => {
|
||||
let mut tx_data = None;
|
||||
{
|
||||
let mut transactions = transactions.write();
|
||||
if let Some(tx) = transactions.last_mut() {
|
||||
tx.insert(key.clone(), serde_json::Value::Null);
|
||||
tx_data = Some(key.clone());
|
||||
}
|
||||
}
|
||||
|
||||
if tx_data.is_none() {
|
||||
let mut db = db.write();
|
||||
if db.remove(&key).is_some() {
|
||||
Self::remove_from_indexes(&indexes, &key);
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("Key not found".to_string())
|
||||
}
|
||||
} else {
|
||||
Response::Success(None)
|
||||
}
|
||||
},
|
||||
Command::BeginTransaction => {
|
||||
let mut transactions = transactions.write();
|
||||
transactions.push(HashMap::new());
|
||||
Response::Success(None)
|
||||
},
|
||||
Command::CommitTransaction => {
|
||||
let mut transactions = transactions.write();
|
||||
if let Some(tx) = transactions.pop() {
|
||||
let mut db = db.write();
|
||||
for (k, v) in tx {
|
||||
if v.is_null() {
|
||||
db.remove(&k);
|
||||
Self::remove_from_indexes(&indexes, &k);
|
||||
} else {
|
||||
db.insert(k.clone(), v.clone());
|
||||
Self::update_indexes(&indexes, &k, &v);
|
||||
}
|
||||
}
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("No active transaction".to_string())
|
||||
}
|
||||
},
|
||||
Command::RollbackTransaction => {
|
||||
let mut transactions = transactions.write();
|
||||
if transactions.pop().is_some() {
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("No active transaction".to_string())
|
||||
}
|
||||
},
|
||||
Command::CreateIndex { field } => {
|
||||
let mut indexes = indexes.write();
|
||||
if !indexes.contains_key(&field) {
|
||||
// Build initial index
|
||||
let mut index = HashMap::new();
|
||||
let db = db.read();
|
||||
for (key, value) in db.iter() {
|
||||
if let Some(field_value) = value.get(&field) {
|
||||
index.entry(field_value.clone())
|
||||
.or_insert_with(Vec::new)
|
||||
.push(key.clone());
|
||||
}
|
||||
}
|
||||
indexes.insert(field, index);
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("Index already exists".to_string())
|
||||
}
|
||||
},
|
||||
Command::DropIndex { field } => {
|
||||
let mut indexes = indexes.write();
|
||||
if indexes.remove(&field).is_some() {
|
||||
Response::Success(None)
|
||||
} else {
|
||||
Response::Error("Index not found".to_string())
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
let response_bytes = rmp_serde::to_vec(&response)?;
|
||||
let len = response_bytes.len() as u32;
|
||||
socket.write_all(&len.to_be_bytes()).await?;
|
||||
socket.write_all(&response_bytes).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update_indexes(
|
||||
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||
key: &str,
|
||||
value: &serde_json::Value,
|
||||
) {
|
||||
let mut indexes = indexes.write();
|
||||
for (field, index) in indexes.iter_mut() {
|
||||
if let Some(field_value) = value.get(field) {
|
||||
// Remove old references
|
||||
for entries in index.values_mut() {
|
||||
if let Some(pos) = entries.iter().position(|k| k == key) {
|
||||
entries.remove(pos);
|
||||
}
|
||||
}
|
||||
// Add new reference
|
||||
index.entry(field_value.clone())
|
||||
.or_insert_with(Vec::new)
|
||||
.push(key.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_from_indexes(
|
||||
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
|
||||
key: &str,
|
||||
) {
|
||||
let mut indexes = indexes.write();
|
||||
for index in indexes.values_mut() {
|
||||
for entries in index.values_mut() {
|
||||
if let Some(pos) = entries.iter().position(|k| k == key) {
|
||||
entries.remove(pos);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user