first rust-server commit

This commit is contained in:
Григорий Сафронов 2025-05-24 00:26:45 +03:00
commit 1e750ad9e1
8 changed files with 2321 additions and 0 deletions

1676
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

19
Cargo.toml Normal file
View File

@ -0,0 +1,19 @@
[package]
name = "futriix"
version = "0.1.0"
edition = "2024"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["full"] }
redis = { version = "0.23", features = ["cluster"] }
config = "0.13"
log = "0.4"
pretty_env_logger = "0.4"
colored = "2.0"
anyhow = "1.0"
thiserror = "1.0"
log4rs = "1.3.0"

16
futriix.conf Normal file
View File

@ -0,0 +1,16 @@
# Futriix Configuration File
# Port to listen on
port = 6379
# Path to log file
log_path = "futriix.log"
# Cluster mode (true/false)
cluster_mode = false
# Redis nodes (for cluster mode)
redis_nodes = [
"redis://127.0.0.1:6379",
"redis://127.0.0.1:6380"
]

380
src/commands.rs Normal file
View File

@ -0,0 +1,380 @@
use redis::{RedisResult, cluster::ClusterConnection, Value as RedisValue};
use redis::cmd;
use serde_json::{Value, json};
use std::time::Instant;
use log::info;
#[derive(Debug)]
pub enum Command {
Eval(String),
Multi(Vec<String>),
Discard,
Exec,
JsonArrAppend(Vec<String>),
JsonArrIndex(Vec<String>),
JsonArrInsert(Vec<String>),
JsonArrLen(Vec<String>),
JsonArrPop(Vec<String>),
JsonArrTrim(Vec<String>),
JsonClear(Vec<String>),
JsonDebug(Vec<String>),
JsonDel(Vec<String>),
JsonForget(Vec<String>),
JsonGet(Vec<String>),
JsonMGet(Vec<String>),
JsonMSet(Vec<String>),
JsonNumIncrBy(Vec<String>),
JsonNumMultBy(Vec<String>),
JsonObjKeys(Vec<String>),
JsonObjLen(Vec<String>),
JsonResp(Vec<String>),
JsonSet(Vec<String>),
JsonStrAppend(Vec<String>),
JsonStrLen(Vec<String>),
JsonToggle(Vec<String>),
JsonType(Vec<String>),
}
impl Command {
pub fn parse(input: &str) -> Option<Self> {
let parts: Vec<&str> = input.split_whitespace().collect();
if parts.is_empty() {
return None;
}
match parts[0].to_uppercase().as_str() {
"EVAL" => {
if parts.len() >= 2 {
Some(Command::Eval(parts[1..].join(" ")))
} else {
None
}
}
"MULTI" => {
if parts.len() >= 2 {
Some(Command::Multi(parts[1..].iter().map(|s| s.to_string()).collect()))
} else {
None
}
}
"DISCARD" => Some(Command::Discard),
"EXEC" => Some(Command::Exec),
"JSON.ARRAPPEND" => Some(Command::JsonArrAppend(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.ARRINDEX" => Some(Command::JsonArrIndex(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.ARRINSERT" => Some(Command::JsonArrInsert(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.ARRLEN" => Some(Command::JsonArrLen(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.ARRPOP" => Some(Command::JsonArrPop(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.ARRTRIM" => Some(Command::JsonArrTrim(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.CLEAR" => Some(Command::JsonClear(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.DEBUG" => Some(Command::JsonDebug(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.DEL" => Some(Command::JsonDel(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.FORGET" => Some(Command::JsonForget(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.GET" => Some(Command::JsonGet(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.MGET" => Some(Command::JsonMGet(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.MSET" => Some(Command::JsonMSet(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.NUMINCRBY" => Some(Command::JsonNumIncrBy(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.NUMMULTBY" => Some(Command::JsonNumMultBy(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.OBJKEYS" => Some(Command::JsonObjKeys(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.OBJLEN" => Some(Command::JsonObjLen(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.RESP" => Some(Command::JsonResp(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.SET" => Some(Command::JsonSet(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.STRAPPEND" => Some(Command::JsonStrAppend(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.STRLEN" => Some(Command::JsonStrLen(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.TOGGLE" => Some(Command::JsonToggle(parts[1..].iter().map(|s| s.to_string()).collect())),
"JSON.TYPE" => Some(Command::JsonType(parts[1..].iter().map(|s| s.to_string()).collect())),
_ => None,
}
}
fn redis_value_to_json(value: RedisValue) -> Value {
match value {
RedisValue::Nil => json!(null),
RedisValue::Int(i) => json!(i),
RedisValue::Data(bytes) => Value::String(String::from_utf8_lossy(&bytes).into_owned()),
RedisValue::Bulk(values) => Value::Array(values.into_iter().map(Self::redis_value_to_json).collect()),
RedisValue::Status(s) => json!(s),
RedisValue::Okay => json!("OK"),
}
}
fn execute_json_command(cmd_name: &str, args: Vec<String>, conn: &mut impl redis::ConnectionLike) -> RedisResult<Value> {
let mut cmd = cmd(cmd_name);
for arg in args {
cmd.arg(arg);
}
let result = cmd.query::<RedisValue>(conn)?;
Ok(Self::redis_value_to_json(result))
}
pub fn execute_standalone(&self, conn: &mut redis::Connection) -> RedisResult<Value> {
let start_time = Instant::now();
let result = match self {
Command::Eval(script) => {
info!("Executing EVAL: {}", script);
let result = cmd("EVAL")
.arg(script)
.arg(0)
.query::<RedisValue>(conn)?;
Ok(Self::redis_value_to_json(result))
}
Command::Multi(commands) => {
info!("Executing MULTI with {} commands", commands.len());
let mut pipe = redis::pipe();
for cmd_str in commands {
let parts: Vec<&str> = cmd_str.split_whitespace().collect();
let mut cmd = cmd(parts[0]);
for arg in &parts[1..] {
cmd.arg(*arg);
}
pipe.add_command(cmd);
}
let result = pipe.query::<RedisValue>(conn)?;
Ok(Self::redis_value_to_json(result))
}
Command::Discard => {
info!("Executing DISCARD");
let result = cmd("DISCARD").query::<RedisValue>(conn)?;
Ok(Self::redis_value_to_json(result))
}
Command::Exec => {
info!("Executing EXEC");
let result = cmd("EXEC").query::<RedisValue>(conn)?;
Ok(Self::redis_value_to_json(result))
}
// Обработка JSON-команд
Command::JsonArrAppend(args) => {
info!("Executing JSON.ARRAPPEND");
Self::execute_json_command("JSON.ARRAPPEND", args.clone(), conn)
}
Command::JsonArrIndex(args) => {
info!("Executing JSON.ARRINDEX");
Self::execute_json_command("JSON.ARRINDEX", args.clone(), conn)
}
Command::JsonArrInsert(args) => {
info!("Executing JSON.ARRINSERT");
Self::execute_json_command("JSON.ARRINSERT", args.clone(), conn)
}
Command::JsonArrLen(args) => {
info!("Executing JSON.ARRLEN");
Self::execute_json_command("JSON.ARRLEN", args.clone(), conn)
}
Command::JsonArrPop(args) => {
info!("Executing JSON.ARRPOP");
Self::execute_json_command("JSON.ARRPOP", args.clone(), conn)
}
Command::JsonArrTrim(args) => {
info!("Executing JSON.ARRTRIM");
Self::execute_json_command("JSON.ARRTRIM", args.clone(), conn)
}
Command::JsonClear(args) => {
info!("Executing JSON.CLEAR");
Self::execute_json_command("JSON.CLEAR", args.clone(), conn)
}
Command::JsonDebug(args) => {
info!("Executing JSON.DEBUG");
Self::execute_json_command("JSON.DEBUG", args.clone(), conn)
}
Command::JsonDel(args) => {
info!("Executing JSON.DEL");
Self::execute_json_command("JSON.DEL", args.clone(), conn)
}
Command::JsonForget(args) => {
info!("Executing JSON.FORGET");
Self::execute_json_command("JSON.FORGET", args.clone(), conn)
}
Command::JsonGet(args) => {
info!("Executing JSON.GET");
Self::execute_json_command("JSON.GET", args.clone(), conn)
}
Command::JsonMGet(args) => {
info!("Executing JSON.MGET");
Self::execute_json_command("JSON.MGET", args.clone(), conn)
}
Command::JsonMSet(args) => {
info!("Executing JSON.MSET");
Self::execute_json_command("JSON.MSET", args.clone(), conn)
}
Command::JsonNumIncrBy(args) => {
info!("Executing JSON.NUMINCRBY");
Self::execute_json_command("JSON.NUMINCRBY", args.clone(), conn)
}
Command::JsonNumMultBy(args) => {
info!("Executing JSON.NUMMULTBY");
Self::execute_json_command("JSON.NUMMULTBY", args.clone(), conn)
}
Command::JsonObjKeys(args) => {
info!("Executing JSON.OBJKEYS");
Self::execute_json_command("JSON.OBJKEYS", args.clone(), conn)
}
Command::JsonObjLen(args) => {
info!("Executing JSON.OBJLEN");
Self::execute_json_command("JSON.OBJLEN", args.clone(), conn)
}
Command::JsonResp(args) => {
info!("Executing JSON.RESP");
Self::execute_json_command("JSON.RESP", args.clone(), conn)
}
Command::JsonSet(args) => {
info!("Executing JSON.SET");
Self::execute_json_command("JSON.SET", args.clone(), conn)
}
Command::JsonStrAppend(args) => {
info!("Executing JSON.STRAPPEND");
Self::execute_json_command("JSON.STRAPPEND", args.clone(), conn)
}
Command::JsonStrLen(args) => {
info!("Executing JSON.STRLEN");
Self::execute_json_command("JSON.STRLEN", args.clone(), conn)
}
Command::JsonToggle(args) => {
info!("Executing JSON.TOGGLE");
Self::execute_json_command("JSON.TOGGLE", args.clone(), conn)
}
Command::JsonType(args) => {
info!("Executing JSON.TYPE");
Self::execute_json_command("JSON.TYPE", args.clone(), conn)
}
};
let duration = start_time.elapsed();
info!("Command executed in {:?}", duration);
result
}
pub fn execute_cluster(&self, conn: &mut ClusterConnection) -> RedisResult<Value> {
let start_time = Instant::now();
let result = match self {
Command::Eval(script) => {
info!("Executing EVAL (cluster): {}", script);
let result = cmd("EVAL")
.arg(script)
.arg(0)
.query::<RedisValue>(conn)?;
Ok(Self::redis_value_to_json(result))
}
Command::Multi(commands) => {
info!("Executing MULTI with {} commands (cluster)", commands.len());
let mut pipe = redis::pipe();
for cmd_str in commands {
let parts: Vec<&str> = cmd_str.split_whitespace().collect();
let mut cmd = cmd(parts[0]);
for arg in &parts[1..] {
cmd.arg(*arg);
}
pipe.add_command(cmd);
}
let result = pipe.query::<RedisValue>(conn)?;
Ok(Self::redis_value_to_json(result))
}
Command::Discard => {
info!("Executing DISCARD (cluster)");
let result = cmd("DISCARD").query::<RedisValue>(conn)?;
Ok(Self::redis_value_to_json(result))
}
Command::Exec => {
info!("Executing EXEC (cluster)");
let result = cmd("EXEC").query::<RedisValue>(conn)?;
Ok(Self::redis_value_to_json(result))
}
// Обработка JSON-команд для кластера
Command::JsonArrAppend(args) => {
info!("Executing JSON.ARRAPPEND (cluster)");
Self::execute_json_command("JSON.ARRAPPEND", args.clone(), conn)
}
Command::JsonArrIndex(args) => {
info!("Executing JSON.ARRINDEX (cluster)");
Self::execute_json_command("JSON.ARRINDEX", args.clone(), conn)
}
Command::JsonArrInsert(args) => {
info!("Executing JSON.ARRINSERT (cluster)");
Self::execute_json_command("JSON.ARRINSERT", args.clone(), conn)
}
Command::JsonArrLen(args) => {
info!("Executing JSON.ARRLEN (cluster)");
Self::execute_json_command("JSON.ARRLEN", args.clone(), conn)
}
Command::JsonArrPop(args) => {
info!("Executing JSON.ARRPOP (cluster)");
Self::execute_json_command("JSON.ARRPOP", args.clone(), conn)
}
Command::JsonArrTrim(args) => {
info!("Executing JSON.ARRTRIM (cluster)");
Self::execute_json_command("JSON.ARRTRIM", args.clone(), conn)
}
Command::JsonClear(args) => {
info!("Executing JSON.CLEAR (cluster)");
Self::execute_json_command("JSON.CLEAR", args.clone(), conn)
}
Command::JsonDebug(args) => {
info!("Executing JSON.DEBUG (cluster)");
Self::execute_json_command("JSON.DEBUG", args.clone(), conn)
}
Command::JsonDel(args) => {
info!("Executing JSON.DEL (cluster)");
Self::execute_json_command("JSON.DEL", args.clone(), conn)
}
Command::JsonForget(args) => {
info!("Executing JSON.FORGET (cluster)");
Self::execute_json_command("JSON.FORGET", args.clone(), conn)
}
Command::JsonGet(args) => {
info!("Executing JSON.GET (cluster)");
Self::execute_json_command("JSON.GET", args.clone(), conn)
}
Command::JsonMGet(args) => {
info!("Executing JSON.MGET (cluster)");
Self::execute_json_command("JSON.MGET", args.clone(), conn)
}
Command::JsonMSet(args) => {
info!("Executing JSON.MSET (cluster)");
Self::execute_json_command("JSON.MSET", args.clone(), conn)
}
Command::JsonNumIncrBy(args) => {
info!("Executing JSON.NUMINCRBY (cluster)");
Self::execute_json_command("JSON.NUMINCRBY", args.clone(), conn)
}
Command::JsonNumMultBy(args) => {
info!("Executing JSON.NUMMULTBY (cluster)");
Self::execute_json_command("JSON.NUMMULTBY", args.clone(), conn)
}
Command::JsonObjKeys(args) => {
info!("Executing JSON.OBJKEYS (cluster)");
Self::execute_json_command("JSON.OBJKEYS", args.clone(), conn)
}
Command::JsonObjLen(args) => {
info!("Executing JSON.OBJLEN (cluster)");
Self::execute_json_command("JSON.OBJLEN", args.clone(), conn)
}
Command::JsonResp(args) => {
info!("Executing JSON.RESP (cluster)");
Self::execute_json_command("JSON.RESP", args.clone(), conn)
}
Command::JsonSet(args) => {
info!("Executing JSON.SET (cluster)");
Self::execute_json_command("JSON.SET", args.clone(), conn)
}
Command::JsonStrAppend(args) => {
info!("Executing JSON.STRAPPEND (cluster)");
Self::execute_json_command("JSON.STRAPPEND", args.clone(), conn)
}
Command::JsonStrLen(args) => {
info!("Executing JSON.STRLEN (cluster)");
Self::execute_json_command("JSON.STRLEN", args.clone(), conn)
}
Command::JsonToggle(args) => {
info!("Executing JSON.TOGGLE (cluster)");
Self::execute_json_command("JSON.TOGGLE", args.clone(), conn)
}
Command::JsonType(args) => {
info!("Executing JSON.TYPE (cluster)");
Self::execute_json_command("JSON.TYPE", args.clone(), conn)
}
};
let duration = start_time.elapsed();
info!("Command executed in {:?}", duration);
result
}
}

36
src/config.rs Normal file
View File

@ -0,0 +1,36 @@
use serde::Deserialize;
use std::path::PathBuf;
use config::ConfigError;
#[derive(Debug, Deserialize, Clone)]
pub struct ServerConfig {
pub port: u16,
pub log_path: PathBuf,
pub cluster_mode: bool,
pub redis_nodes: Vec<String>,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
port: 6379,
log_path: "futriix.log".into(),
cluster_mode: false,
redis_nodes: vec!["redis://127.0.0.1:6379".to_string()],
}
}
}
impl ServerConfig {
pub fn load() -> Result<Self, ConfigError> {
let mut builder = config::Config::builder()
.set_default("port", 6379)?
.set_default("log_path", "futriix.log")?
.set_default("cluster_mode", false)?
.set_default("redis_nodes", vec!["redis://127.0.0.1:6379"])?;
builder = builder.add_source(config::File::with_name("futriix").required(false));
builder.build()?.try_deserialize()
}
}

39
src/logging.rs Normal file
View File

@ -0,0 +1,39 @@
use std::path::PathBuf;
use log4rs::{
append::{
console::ConsoleAppender,
file::FileAppender,
},
config::{Appender, Config, Root},
encode::pattern::PatternEncoder,
};
use log::info;
pub fn setup_logging(log_path: &PathBuf) -> anyhow::Result<()> {
let logfile = FileAppender::builder()
.encoder(Box::new(PatternEncoder::new("{d} {l} - {m}\n")))
.build(log_path)?;
let stdout = ConsoleAppender::builder()
.encoder(Box::new(PatternEncoder::new("{h({l})} {m}\n")))
.build();
let config = Config::builder()
.appender(Appender::builder().build("stdout", Box::new(stdout)))
.appender(Appender::builder().build("logfile", Box::new(logfile)))
.build(
Root::builder()
.appender("stdout")
.appender("logfile")
.build(log::LevelFilter::Info),
)?;
log4rs::init_config(config)?;
// Это сообщение теперь будет выведено сразу после названия сервера
println! ("\n");
info!("Logging initialized. Log file: {}", log_path.display());
Ok(())
}

22
src/main.rs Normal file
View File

@ -0,0 +1,22 @@
mod config;
mod commands;
mod server;
mod logging;
use std::sync::Arc;
use crate::config::ServerConfig;
use log::error;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = Arc::new(ServerConfig::load()?);
logging::setup_logging(&config.log_path)?;
server::print_startup_info(&config);
if let Err(e) = server::start_server(config).await {
error!("Server error: {}", e);
return Err(e);
}
Ok(())
}

133
src/server.rs Normal file
View File

@ -0,0 +1,133 @@
use tokio::net::TcpListener;
use std::sync::Arc;
use crate::config::ServerConfig;
use log::{info, error};
use colored::Colorize;
pub fn print_startup_info(config: &ServerConfig) {
println!();
println!("{}", "Futriix-Distributed JSON-server".bold().green());
println!("Process ID: {}", std::process::id());
println!("Port: {}", config.port);
println!("Mode: {}", if config.cluster_mode {
"Cluster-mode".bold().blue()
} else {
"Standalone-mode".bold().yellow()
});
println!("Log file: {}", config.log_path.display());
println!();
}
async fn handle_standalone_connection(
mut stream: tokio::net::TcpStream,
client: redis::Client,
_config: Arc<ServerConfig>,
) -> anyhow::Result<()> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut conn = client.get_connection()?;
let mut buffer = [0; 1024];
loop {
let n = stream.read(&mut buffer).await?;
if n == 0 {
break;
}
let input = String::from_utf8_lossy(&buffer[..n]);
info!("Received command: {}", input.trim());
if let Some(cmd) = crate::commands::Command::parse(&input) {
match cmd.execute_standalone(&mut conn) {
Ok(result) => {
let response = serde_json::to_string(&result)?;
stream.write_all(response.as_bytes()).await?;
}
Err(e) => {
let error_msg = format!("Error: {}", e);
stream.write_all(error_msg.as_bytes()).await?;
}
}
} else {
let error_msg = "Error: Invalid command";
stream.write_all(error_msg.as_bytes()).await?;
}
}
Ok(())
}
async fn handle_cluster_connection(
mut stream: tokio::net::TcpStream,
client: redis::cluster::ClusterClient,
_config: Arc<ServerConfig>,
) -> anyhow::Result<()> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut conn = client.get_connection()?;
let mut buffer = [0; 1024];
loop {
let n = stream.read(&mut buffer).await?;
if n == 0 {
break;
}
let input = String::from_utf8_lossy(&buffer[..n]);
info!("Received command (cluster): {}", input.trim());
if let Some(cmd) = crate::commands::Command::parse(&input) {
match cmd.execute_cluster(&mut conn) {
Ok(result) => {
let response = serde_json::to_string(&result)?;
stream.write_all(response.as_bytes()).await?;
}
Err(e) => {
let error_msg = format!("Error: {}", e);
stream.write_all(error_msg.as_bytes()).await?;
}
}
} else {
let error_msg = "Error: Invalid command";
stream.write_all(error_msg.as_bytes()).await?;
}
}
Ok(())
}
pub async fn start_server(config: Arc<ServerConfig>) -> anyhow::Result<()> {
let listener = TcpListener::bind(format!("0.0.0.0:{}", config.port)).await?;
info!("Server started on port {}", config.port);
if config.cluster_mode {
info!("Running in Cluster mode");
let client = redis::cluster::ClusterClient::new(config.redis_nodes.clone())?;
while let Ok((stream, _)) = listener.accept().await {
let client = client.clone();
let config_clone = config.clone();
tokio::spawn(async move {
if let Err(e) = handle_cluster_connection(stream, client, config_clone).await {
error!("Connection error: {}", e);
}
});
}
} else {
info!("Running in Standalone mode");
let client = redis::Client::open(config.redis_nodes[0].clone())?;
while let Ok((stream, _)) = listener.accept().await {
let client = client.clone();
let config_clone = config.clone();
tokio::spawn(async move {
if let Err(e) = handle_standalone_connection(stream, client, config_clone).await {
error!("Connection error: {}", e);
}
});
}
}
Ok(())
}