first commit
This commit is contained in:
commit
c41f7a8e4c
1897
Cargo.lock
generated
Normal file
1897
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
20
Cargo.toml
Normal file
20
Cargo.toml
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
[package]
|
||||||
|
name = "futriix"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
rlua = "0.20.1"
|
||||||
|
tokio = { version = "1.0", features = ["full"] }
|
||||||
|
warp = "0.3"
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_json = "1.0"
|
||||||
|
toml = "0.5"
|
||||||
|
rmp-serde = "1.1"
|
||||||
|
rmp = "0.8"
|
||||||
|
parking_lot = "0.12"
|
||||||
|
ansi_term = "0.12"
|
||||||
|
chrono = "0.4"
|
||||||
|
uuid = { version = "1.0", features = ["v4"] }
|
||||||
|
|
||||||
|
|
25
config.toml
Normal file
25
config.toml
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
[server]
|
||||||
|
host = "127.0.0.1"
|
||||||
|
port = 8080
|
||||||
|
log_file = "futriix.log"
|
||||||
|
https_enabled = false
|
||||||
|
http2_enabled = false
|
||||||
|
ssl_cert = "./ssl/cert.pem"
|
||||||
|
ssl_key = "./ssl/key.pem"
|
||||||
|
|
||||||
|
[database]
|
||||||
|
data_dir = "./data"
|
||||||
|
wal_enabled = true
|
||||||
|
wal_dir = "./wal"
|
||||||
|
replication_enabled = false
|
||||||
|
replication_nodes = []
|
||||||
|
sharding_enabled = false
|
||||||
|
shards = 4
|
||||||
|
cluster_mode = true
|
||||||
|
gossip_port = 7946
|
||||||
|
gossip_interval_ms = 1000
|
||||||
|
node_id = "node-1"
|
||||||
|
|
||||||
|
[lua]
|
||||||
|
scripts_dir = "./lua-scripts"
|
||||||
|
auto_load_scripts = true
|
4
futriix.log
Normal file
4
futriix.log
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
=== Futriix Server Started at 2025-09-21 20:44:27 ===
|
||||||
|
=== Futriix Server Started at 2025-09-21 21:10:56 ===
|
||||||
|
=== Futriix Server Started at 2025-09-21 22:02:15 ===
|
||||||
|
=== Futriix Server Started at 2025-09-21 22:41:07 ===
|
517
src/db.rs
Normal file
517
src/db.rs
Normal file
@ -0,0 +1,517 @@
|
|||||||
|
use ansi_term::Colour;
|
||||||
|
use rmp_serde::{Deserializer, Serializer};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::{BTreeMap, HashMap};
|
||||||
|
use std::fs::{self, File, OpenOptions};
|
||||||
|
use std::io::{BufReader, BufWriter, Write};
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
use parking_lot::RwLock;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio::time::interval;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct Document {
|
||||||
|
pub id: String,
|
||||||
|
pub data: HashMap<String, serde_json::Value>,
|
||||||
|
pub version: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Index {
|
||||||
|
primary: RwLock<BTreeMap<String, String>>, // key -> document_id
|
||||||
|
secondary: RwLock<HashMap<String, Vec<String>>>, // field_value -> document_ids
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Transaction {
|
||||||
|
pub operations: Vec<Operation>,
|
||||||
|
pub committed: AtomicBool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub enum Operation {
|
||||||
|
Insert { doc: Document },
|
||||||
|
Update { doc_id: String, data: HashMap<String, serde_json::Value> },
|
||||||
|
Delete { doc_id: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Shard {
|
||||||
|
pub data: RwLock<HashMap<String, Document>>,
|
||||||
|
pub index: Index,
|
||||||
|
pub wal: Option<Wal>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Wal {
|
||||||
|
file: std::sync::Mutex<File>,
|
||||||
|
sequence: AtomicU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct NodeInfo {
|
||||||
|
pub id: String,
|
||||||
|
pub address: String,
|
||||||
|
pub port: u16,
|
||||||
|
pub status: NodeStatus,
|
||||||
|
pub last_seen: u64,
|
||||||
|
pub version: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
|
||||||
|
pub enum NodeStatus {
|
||||||
|
Alive,
|
||||||
|
Suspect,
|
||||||
|
Dead,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct GossipMessage {
|
||||||
|
pub from_node: String,
|
||||||
|
pub nodes: Vec<NodeInfo>,
|
||||||
|
pub sequence: u64,
|
||||||
|
pub timestamp: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct GossipManager {
|
||||||
|
pub node_id: String,
|
||||||
|
pub nodes: RwLock<HashMap<String, NodeInfo>>,
|
||||||
|
pub sequence: AtomicU64,
|
||||||
|
pub tx: mpsc::Sender<GossipMessage>,
|
||||||
|
pub rx: mpsc::Receiver<GossipMessage>,
|
||||||
|
pub config: DatabaseConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct FutriixDB {
|
||||||
|
pub shards: Vec<Arc<Shard>>,
|
||||||
|
pub config: DatabaseConfig,
|
||||||
|
pub transactions: RwLock<HashMap<String, Transaction>>,
|
||||||
|
pub lua_modules: RwLock<HashMap<String, String>>, // name -> lua code
|
||||||
|
pub gossip: Arc<GossipManager>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct DatabaseConfig {
|
||||||
|
pub data_dir: String,
|
||||||
|
pub wal_enabled: bool,
|
||||||
|
pub wal_dir: String,
|
||||||
|
pub replication_enabled: bool,
|
||||||
|
pub replication_nodes: Vec<String>,
|
||||||
|
pub sharding_enabled: bool,
|
||||||
|
pub shards: usize,
|
||||||
|
pub cluster_mode: bool,
|
||||||
|
pub gossip_port: u16,
|
||||||
|
pub gossip_interval_ms: u64,
|
||||||
|
pub node_id: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FutriixDB {
|
||||||
|
pub fn new(config: &DatabaseConfig) -> Result<Self, Box<dyn std::error::Error>> {
|
||||||
|
// Создаем директории если не существуют
|
||||||
|
fs::create_dir_all(&config.data_dir)?;
|
||||||
|
if config.wal_enabled {
|
||||||
|
fs::create_dir_all(&config.wal_dir)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut shards = Vec::new();
|
||||||
|
for i in 0..config.shards {
|
||||||
|
let shard = Arc::new(Shard::new(config, i)?);
|
||||||
|
shards.push(shard);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Инициализируем GOSSIP менеджер
|
||||||
|
let gossip = GossipManager::new(config)?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
shards,
|
||||||
|
config: config.clone(),
|
||||||
|
transactions: RwLock::new(HashMap::new()),
|
||||||
|
lua_modules: RwLock::new(HashMap::new()),
|
||||||
|
gossip: Arc::new(gossip),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert(&self, key: &str, data: HashMap<String, serde_json::Value>) -> Result<(), String> {
|
||||||
|
let shard = self.get_shard(key);
|
||||||
|
let mut shard_data = shard.data.write();
|
||||||
|
|
||||||
|
let document = Document {
|
||||||
|
id: key.to_string(),
|
||||||
|
data,
|
||||||
|
version: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
if shard_data.contains_key(key) {
|
||||||
|
return Err("Key already exists".to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Добавляем в WAL если включен
|
||||||
|
if let Some(wal) = &shard.wal {
|
||||||
|
if let Err(e) = wal.write_operation(&Operation::Insert { doc: document.clone() }) {
|
||||||
|
return Err(format!("WAL write error: {}", e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
shard_data.insert(key.to_string(), document);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, key: &str) -> Result<Document, String> {
|
||||||
|
let shard = self.get_shard(key);
|
||||||
|
let shard_data = shard.data.read();
|
||||||
|
|
||||||
|
shard_data.get(key)
|
||||||
|
.cloned()
|
||||||
|
.ok_or_else(|| "Key not found".to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update(&self, key: &str, data: HashMap<String, serde_json::Value>) -> Result<(), String> {
|
||||||
|
let shard = self.get_shard(key);
|
||||||
|
let mut shard_data = shard.data.write();
|
||||||
|
|
||||||
|
let doc = shard_data.get_mut(key)
|
||||||
|
.ok_or_else(|| "Key not found".to_string())?;
|
||||||
|
|
||||||
|
doc.data = data;
|
||||||
|
doc.version += 1;
|
||||||
|
|
||||||
|
if let Some(wal) = &shard.wal {
|
||||||
|
wal.write_operation(&Operation::Update {
|
||||||
|
doc_id: key.to_string(),
|
||||||
|
data: doc.data.clone()
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn delete(&self, key: &str) -> Result<(), String> {
|
||||||
|
let shard = self.get_shard(key);
|
||||||
|
let mut shard_data = shard.data.write();
|
||||||
|
|
||||||
|
if !shard_data.contains_key(key) {
|
||||||
|
return Err("Key not found".to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(wal) = &shard.wal {
|
||||||
|
wal.write_operation(&Operation::Delete {
|
||||||
|
doc_id: key.to_string()
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
|
||||||
|
shard_data.remove(key);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_shard(&self, key: &str) -> &Arc<Shard> {
|
||||||
|
let hash = self.hash_key(key);
|
||||||
|
&self.shards[hash % self.shards.len()]
|
||||||
|
}
|
||||||
|
|
||||||
|
fn hash_key(&self, key: &str) -> usize {
|
||||||
|
use std::collections::hash_map::DefaultHasher;
|
||||||
|
use std::hash::{Hash, Hasher};
|
||||||
|
|
||||||
|
let mut hasher = DefaultHasher::new();
|
||||||
|
key.hash(&mut hasher);
|
||||||
|
hasher.finish() as usize
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_lua_module(&self, name: &str, code: &str) {
|
||||||
|
self.lua_modules.write().insert(name.to_string(), code.to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start_gossip(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
if self.config.cluster_mode {
|
||||||
|
self.gossip.start().await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Команды для управления кластером
|
||||||
|
pub fn gossip_add_node(&self, address: &str, port: u16) -> Result<(), String> {
|
||||||
|
if !self.config.cluster_mode {
|
||||||
|
return Err("Cluster mode is disabled".to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
let node_id = format!("{}:{}", address, port);
|
||||||
|
let node_info = NodeInfo {
|
||||||
|
id: node_id.clone(),
|
||||||
|
address: address.to_string(),
|
||||||
|
port,
|
||||||
|
status: NodeStatus::Alive,
|
||||||
|
last_seen: chrono::Utc::now().timestamp_millis() as u64,
|
||||||
|
version: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
self.gossip.update_node(node_info);
|
||||||
|
println!("GOSSIP: Node {}:{} added to cluster", address, port);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn gossip_remove_node(&self, node_id: &str) -> Result<(), String> {
|
||||||
|
if !self.config.cluster_mode {
|
||||||
|
return Err("Cluster mode is disabled".to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut nodes = self.gossip.nodes.write();
|
||||||
|
if nodes.remove(node_id).is_some() {
|
||||||
|
println!("GOSSIP: Node {} removed from cluster", node_id);
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(format!("Node {} not found", node_id))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn gossip_list_nodes(&self) -> Result<Vec<NodeInfo>, String> {
|
||||||
|
if !self.config.cluster_mode {
|
||||||
|
return Err("Cluster mode is disabled".to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(self.gossip.get_nodes())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn gossip_reconfigure(&self, new_nodes: Vec<(String, u16)>) -> Result<(), String> {
|
||||||
|
if !self.config.cluster_mode {
|
||||||
|
return Err("Cluster mode is disabled".to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
let nodes_count = new_nodes.len();
|
||||||
|
let mut nodes = self.gossip.nodes.write();
|
||||||
|
nodes.clear();
|
||||||
|
|
||||||
|
for (address, port) in new_nodes {
|
||||||
|
let node_id = format!("{}:{}", address, port);
|
||||||
|
let node_info = NodeInfo {
|
||||||
|
id: node_id.clone(),
|
||||||
|
address: address.to_string(),
|
||||||
|
port,
|
||||||
|
status: NodeStatus::Alive,
|
||||||
|
last_seen: chrono::Utc::now().timestamp_millis() as u64,
|
||||||
|
version: 0,
|
||||||
|
};
|
||||||
|
nodes.insert(node_id, node_info);
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("GOSSIP: Cluster reconfigured with {} nodes", nodes_count);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Shard {
|
||||||
|
fn new(config: &DatabaseConfig, index: usize) -> Result<Self, Box<dyn std::error::Error>> {
|
||||||
|
let wal = if config.wal_enabled {
|
||||||
|
let wal_path = PathBuf::from(&config.wal_dir).join(format!("shard_{}.wal", index));
|
||||||
|
Some(Wal::new(wal_path)?)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
data: RwLock::new(HashMap::new()),
|
||||||
|
index: Index::new(),
|
||||||
|
wal,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Index {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
primary: RwLock::new(BTreeMap::new()),
|
||||||
|
secondary: RwLock::new(HashMap::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Wal {
|
||||||
|
fn new(path: PathBuf) -> Result<Self, Box<dyn std::error::Error>> {
|
||||||
|
let file = OpenOptions::new()
|
||||||
|
.create(true)
|
||||||
|
.append(true)
|
||||||
|
.open(path)?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
file: std::sync::Mutex::new(file),
|
||||||
|
sequence: AtomicU64::new(0),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_operation(&self, operation: &Operation) -> Result<(), String> {
|
||||||
|
let mut buf = Vec::new();
|
||||||
|
let mut serializer = Serializer::new(&mut buf);
|
||||||
|
operation.serialize(&mut serializer)
|
||||||
|
.map_err(|e| format!("Serialization error: {}", e))?;
|
||||||
|
|
||||||
|
let mut file = self.file.lock().unwrap();
|
||||||
|
file.write_all(&buf)
|
||||||
|
.map_err(|e| format!("Write error: {}", e))?;
|
||||||
|
|
||||||
|
file.write_all(b"\n")
|
||||||
|
.map_err(|e| format!("Write error: {}", e))?;
|
||||||
|
|
||||||
|
self.sequence.fetch_add(1, Ordering::SeqCst);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GossipManager {
|
||||||
|
fn new(config: &DatabaseConfig) -> Result<Self, Box<dyn std::error::Error>> {
|
||||||
|
let (tx, rx) = mpsc::channel(100);
|
||||||
|
|
||||||
|
let node_id = config.node_id.clone().unwrap_or_else(|| {
|
||||||
|
format!("node-{}", Uuid::new_v4().to_string()[..8].to_string())
|
||||||
|
});
|
||||||
|
|
||||||
|
println!("GOSSIP: Initializing with node ID: {}", node_id);
|
||||||
|
println!("GOSSIP: Cluster mode: {}", config.cluster_mode);
|
||||||
|
println!("GOSSIP: Port: {}", config.gossip_port);
|
||||||
|
println!("GOSSIP: Interval: {}ms", config.gossip_interval_ms);
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
node_id,
|
||||||
|
nodes: RwLock::new(HashMap::new()),
|
||||||
|
sequence: AtomicU64::new(0),
|
||||||
|
tx,
|
||||||
|
rx,
|
||||||
|
config: config.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
if !self.config.cluster_mode {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("GOSSIP: Starting gossip loop with {}ms interval", self.config.gossip_interval_ms);
|
||||||
|
|
||||||
|
let gossip_clone = self.clone_manager();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
gossip_clone.run_gossip_loop().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
println!("Cluster: started successfully");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_gossip_loop(&self) {
|
||||||
|
if !self.config.cluster_mode {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut interval = interval(Duration::from_millis(self.config.gossip_interval_ms));
|
||||||
|
let mut broadcast_count = 0;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
|
||||||
|
// Ограничиваем частоту вывода сообщений в консоль
|
||||||
|
if broadcast_count % 10 == 0 {
|
||||||
|
let nodes_count = self.nodes.read().len();
|
||||||
|
if nodes_count > 0 {
|
||||||
|
println!("GOSSIP: Broadcasting to {} nodes", nodes_count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.broadcast_gossip().await;
|
||||||
|
self.cleanup_dead_nodes();
|
||||||
|
|
||||||
|
broadcast_count += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn broadcast_gossip(&self) {
|
||||||
|
let nodes = self.nodes.read().clone();
|
||||||
|
let message = GossipMessage {
|
||||||
|
from_node: self.node_id.clone(),
|
||||||
|
nodes: nodes.values().cloned().collect(),
|
||||||
|
sequence: self.sequence.fetch_add(1, Ordering::SeqCst),
|
||||||
|
timestamp: chrono::Utc::now().timestamp_millis() as u64,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Здесь будет реализация рассылки сообщений другим узлам
|
||||||
|
// через сетевые сокеты или другие транспортные механизмы
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cleanup_dead_nodes(&self) {
|
||||||
|
let mut nodes = self.nodes.write();
|
||||||
|
let now = chrono::Utc::now().timestamp_millis() as u64;
|
||||||
|
let mut removed_count = 0;
|
||||||
|
|
||||||
|
nodes.retain(|_, node| {
|
||||||
|
if node.status == NodeStatus::Dead {
|
||||||
|
removed_count += 1;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Помечаем узлы как подозрительные если не видели их более 30 секунд
|
||||||
|
if now - node.last_seen > 30000 {
|
||||||
|
node.status = NodeStatus::Suspect;
|
||||||
|
println!("GOSSIP: Node {} marked as suspect", node.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Помечаем узлы как мертвые если не видели их более 60 секунд
|
||||||
|
if now - node.last_seen > 60000 {
|
||||||
|
node.status = NodeStatus::Dead;
|
||||||
|
println!("GOSSIP: Node {} marked as dead", node.id);
|
||||||
|
removed_count += 1;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
true
|
||||||
|
});
|
||||||
|
|
||||||
|
if removed_count > 0 {
|
||||||
|
println!("GOSSIP: Removed {} dead nodes", removed_count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_node(&self, node_info: NodeInfo) {
|
||||||
|
let node_id = node_info.id.clone();
|
||||||
|
let mut nodes = self.nodes.write();
|
||||||
|
nodes.insert(node_info.id.clone(), node_info);
|
||||||
|
println!("GOSSIP: Node {} updated", node_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_nodes(&self) -> Vec<NodeInfo> {
|
||||||
|
let nodes = self.nodes.read();
|
||||||
|
nodes.values().cloned().collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clone_manager(&self) -> Arc<Self> {
|
||||||
|
// Создаем новый канал для клонированного менеджера
|
||||||
|
let (tx, rx) = mpsc::channel(100);
|
||||||
|
|
||||||
|
Arc::new(Self {
|
||||||
|
node_id: self.node_id.clone(),
|
||||||
|
nodes: RwLock::new(self.nodes.read().clone()),
|
||||||
|
sequence: AtomicU64::new(self.sequence.load(Ordering::SeqCst)),
|
||||||
|
tx,
|
||||||
|
rx,
|
||||||
|
config: self.config.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Реализация ручной клонирования для GossipManager
|
||||||
|
impl Clone for GossipManager {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
// Создаем новый канал для клонированного менеджера
|
||||||
|
let (tx, rx) = mpsc::channel(100);
|
||||||
|
|
||||||
|
Self {
|
||||||
|
node_id: self.node_id.clone(),
|
||||||
|
nodes: RwLock::new(self.nodes.read().clone()),
|
||||||
|
sequence: AtomicU64::new(self.sequence.load(Ordering::SeqCst)),
|
||||||
|
tx,
|
||||||
|
rx,
|
||||||
|
config: self.config.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
166
src/lua_interpreter.rs
Normal file
166
src/lua_interpreter.rs
Normal file
@ -0,0 +1,166 @@
|
|||||||
|
use crate::db::FutriixDB;
|
||||||
|
use ansi_term::Colour;
|
||||||
|
use rlua::{Function, Lua, Result as LuaResult, Value};
|
||||||
|
use std::fs;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
pub struct LuaInterpreter {
|
||||||
|
lua: Lua,
|
||||||
|
db: Arc<Mutex<FutriixDB>>,
|
||||||
|
scripts_dir: String,
|
||||||
|
history: Vec<String>,
|
||||||
|
in_db_mode: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LuaInterpreter {
|
||||||
|
pub fn new(db: Arc<Mutex<FutriixDB>>, scripts_dir: String) -> Result<Self, Box<dyn std::error::Error>> {
|
||||||
|
let lua = Lua::new();
|
||||||
|
|
||||||
|
let mut interpreter = Self {
|
||||||
|
lua,
|
||||||
|
db,
|
||||||
|
scripts_dir,
|
||||||
|
history: Vec::new(),
|
||||||
|
in_db_mode: false,
|
||||||
|
};
|
||||||
|
|
||||||
|
interpreter.setup_lua_environment()?;
|
||||||
|
Ok(interpreter)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn setup_lua_environment(&mut self) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let lua = &self.lua;
|
||||||
|
lua.scope(|scope| {
|
||||||
|
// Глобальные функции для работы с БД
|
||||||
|
let globals = lua.globals();
|
||||||
|
|
||||||
|
// Функция для запуска DB mode
|
||||||
|
let start_db = lua.create_function(|_, ()| {
|
||||||
|
Ok("Switching to database mode...")
|
||||||
|
})?;
|
||||||
|
globals.set("inbox", lua.create_table()?)?;
|
||||||
|
globals.get::<_, rlua::Table>("inbox")?
|
||||||
|
.set("start", start_db)?;
|
||||||
|
|
||||||
|
// Функция выхода
|
||||||
|
let exit_func = lua.create_function(|_, ()| -> rlua::Result<()> {
|
||||||
|
std::process::exit(0);
|
||||||
|
})?;
|
||||||
|
globals.set("exit", exit_func)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
self.load_lua_scripts()?;
|
||||||
|
|
||||||
|
println!("{}", Colour::RGB(0x00, 0xBF, 0xFF).paint("Type inbox.start to enter database mode"));
|
||||||
|
println!();
|
||||||
|
|
||||||
|
let mut input = String::new();
|
||||||
|
loop {
|
||||||
|
if self.in_db_mode {
|
||||||
|
print!("{}", Colour::RGB(0x00, 0xBF, 0xFF).paint("futriiX:~> "));
|
||||||
|
} else {
|
||||||
|
print!("{}", Colour::RGB(0x90, 0xEE, 0x90).paint("lua> ")); // Светло-зеленый цвет
|
||||||
|
}
|
||||||
|
|
||||||
|
std::io::Write::flush(&mut std::io::stdout())?;
|
||||||
|
|
||||||
|
input.clear();
|
||||||
|
std::io::stdin().read_line(&mut input)?;
|
||||||
|
let input = input.trim();
|
||||||
|
|
||||||
|
if input.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.history.push(input.to_string());
|
||||||
|
|
||||||
|
match input {
|
||||||
|
"exit" => break,
|
||||||
|
"inbox.start" => {
|
||||||
|
self.in_db_mode = true;
|
||||||
|
println!("{}", Colour::Green.paint("Database mode activated"));
|
||||||
|
continue;
|
||||||
|
},
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = self.execute_lua(input).await {
|
||||||
|
println!("{}: {}", Colour::Red.paint("Error"), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn execute_lua(&self, code: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let result = self.lua.scope(|scope| {
|
||||||
|
self.lua.load(code).eval::<Value>()
|
||||||
|
});
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(value) => {
|
||||||
|
if !value.is_nil() {
|
||||||
|
println!("=> {}", self.value_to_string(value));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
Err(Box::new(e) as Box<dyn std::error::Error>)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn value_to_string(&self, value: Value) -> String {
|
||||||
|
match value {
|
||||||
|
Value::String(s) => s.to_str().unwrap_or("").to_string(),
|
||||||
|
Value::Number(n) => n.to_string(),
|
||||||
|
Value::Boolean(b) => b.to_string(),
|
||||||
|
Value::Nil => "nil".to_string(),
|
||||||
|
Value::Table(t) => {
|
||||||
|
let mut result = String::from("{");
|
||||||
|
for pair in t.pairs::<Value, Value>() {
|
||||||
|
if let Ok((k, v)) = pair {
|
||||||
|
result.push_str(&format!("{}: {}, ", self.value_to_string(k), self.value_to_string(v)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result.push('}');
|
||||||
|
result
|
||||||
|
}
|
||||||
|
_ => "unknown".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_lua_scripts(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let path = PathBuf::from(&self.scripts_dir);
|
||||||
|
if !path.exists() {
|
||||||
|
fs::create_dir_all(&path)?;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
for entry in fs::read_dir(path)? {
|
||||||
|
let entry = entry?;
|
||||||
|
let path = entry.path();
|
||||||
|
|
||||||
|
if path.extension().and_then(|s| s.to_str()) == Some("lua") {
|
||||||
|
if let Ok(script) = fs::read_to_string(&path) {
|
||||||
|
println!("Loading script: {}", path.display());
|
||||||
|
if let Err(e) = self.lua.scope(|scope| {
|
||||||
|
self.lua.load(&script).exec()
|
||||||
|
}) {
|
||||||
|
println!("{} loading script {}: {}", Colour::Red.paint("Error"), path.display(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
24
src/main.rs
Normal file
24
src/main.rs
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
mod server;
|
||||||
|
mod db;
|
||||||
|
mod lua_interpreter;
|
||||||
|
|
||||||
|
use ansi_term::Colour;
|
||||||
|
use std::error::Error;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
|
// Выводим приветственное сообщение
|
||||||
|
println!();
|
||||||
|
println!("{}", Colour::RGB(0x00, 0xBF, 0xFF).paint("futriiX database server"));
|
||||||
|
|
||||||
|
// Инициализируем сервер
|
||||||
|
let mut server = server::FutriixServer::new()?;
|
||||||
|
|
||||||
|
// Запускаем GOSSIP протокол синхронно
|
||||||
|
server.start_gossip().await?;
|
||||||
|
|
||||||
|
// Запускаем основной сервер
|
||||||
|
server.run().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
160
src/server.rs
Normal file
160
src/server.rs
Normal file
@ -0,0 +1,160 @@
|
|||||||
|
use crate::db::FutriixDB;
|
||||||
|
use crate::db;
|
||||||
|
use crate::lua_interpreter::LuaInterpreter;
|
||||||
|
use ansi_term::Colour;
|
||||||
|
use chrono::Local;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use std::fs::OpenOptions;
|
||||||
|
use std::io::Write;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct ServerConfig {
|
||||||
|
pub host: String,
|
||||||
|
pub port: u16,
|
||||||
|
pub log_file: String,
|
||||||
|
pub https_enabled: bool,
|
||||||
|
pub http2_enabled: bool,
|
||||||
|
pub ssl_cert: Option<String>,
|
||||||
|
pub ssl_key: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct DatabaseConfig {
|
||||||
|
pub data_dir: String,
|
||||||
|
pub wal_enabled: bool,
|
||||||
|
pub wal_dir: String,
|
||||||
|
pub replication_enabled: bool,
|
||||||
|
pub replication_nodes: Vec<String>,
|
||||||
|
pub sharding_enabled: bool,
|
||||||
|
pub shards: usize,
|
||||||
|
pub cluster_mode: bool,
|
||||||
|
pub gossip_port: u16,
|
||||||
|
pub gossip_interval_ms: u64,
|
||||||
|
pub node_id: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct LuaConfig {
|
||||||
|
pub scripts_dir: String,
|
||||||
|
pub auto_load_scripts: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct Config {
|
||||||
|
pub server: ServerConfig,
|
||||||
|
pub database: DatabaseConfig,
|
||||||
|
pub lua: LuaConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct FutriixServer {
|
||||||
|
config: Config,
|
||||||
|
db: Arc<Mutex<FutriixDB>>,
|
||||||
|
lua_interpreter: LuaInterpreter,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FutriixServer {
|
||||||
|
pub fn new() -> Result<Self, Box<dyn std::error::Error>> {
|
||||||
|
// Загрузка конфигурации
|
||||||
|
let config_str = std::fs::read_to_string("config.toml")?;
|
||||||
|
let config: Config = toml::from_str(&config_str)?;
|
||||||
|
|
||||||
|
// Создание лог-файла с датой и временем
|
||||||
|
let mut log_file = OpenOptions::new()
|
||||||
|
.create(true)
|
||||||
|
.append(true)
|
||||||
|
.open(&config.server.log_file)?;
|
||||||
|
|
||||||
|
let now = Local::now();
|
||||||
|
writeln!(log_file, "=== Futriix Server Started at {} ===", now.format("%Y-%m-%d %H:%M:%S"))?;
|
||||||
|
|
||||||
|
// Инициализация базы данных
|
||||||
|
let db_config = db::DatabaseConfig {
|
||||||
|
data_dir: config.database.data_dir.clone(),
|
||||||
|
wal_enabled: config.database.wal_enabled,
|
||||||
|
wal_dir: config.database.wal_dir.clone(),
|
||||||
|
replication_enabled: config.database.replication_enabled,
|
||||||
|
replication_nodes: config.database.replication_nodes.clone(),
|
||||||
|
sharding_enabled: config.database.sharding_enabled,
|
||||||
|
shards: config.database.shards,
|
||||||
|
cluster_mode: config.database.cluster_mode,
|
||||||
|
gossip_port: config.database.gossip_port,
|
||||||
|
gossip_interval_ms: config.database.gossip_interval_ms,
|
||||||
|
node_id: config.database.node_id.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let db = FutriixDB::new(&db_config)?;
|
||||||
|
|
||||||
|
// Создаем Arc для базы данных
|
||||||
|
let db_arc = Arc::new(Mutex::new(db));
|
||||||
|
|
||||||
|
// Инициализация Lua интерпретатора
|
||||||
|
let lua_interpreter = LuaInterpreter::new(db_arc.clone(), config.lua.scripts_dir.clone())?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
config,
|
||||||
|
db: db_arc,
|
||||||
|
lua_interpreter,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
// Запуск HTTP сервера
|
||||||
|
self.start_http_server()?;
|
||||||
|
|
||||||
|
// Запуск Lua интерпретатора
|
||||||
|
self.start_lua_interpreter().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start_gossip(&mut self) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let db = self.db.lock().await;
|
||||||
|
if db.config.cluster_mode {
|
||||||
|
db.start_gossip().await?;
|
||||||
|
} else {
|
||||||
|
println!("{}", Colour::White.paint("Cluster mode disabled in config"));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_http_server(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let protocol = if self.config.server.https_enabled {
|
||||||
|
"HTTPS"
|
||||||
|
} else {
|
||||||
|
"HTTP"
|
||||||
|
};
|
||||||
|
|
||||||
|
let http2_support = if self.config.server.http2_enabled {
|
||||||
|
" with HTTP/2"
|
||||||
|
} else {
|
||||||
|
""
|
||||||
|
};
|
||||||
|
|
||||||
|
println!("{}", Colour::White.paint(
|
||||||
|
format!("{} server starting on port {}{}", protocol, self.config.server.port, http2_support)
|
||||||
|
));
|
||||||
|
|
||||||
|
if self.config.server.https_enabled {
|
||||||
|
println!("{}", Colour::White.paint("SSL/TLS encryption enabled"));
|
||||||
|
if let Some(cert_path) = &self.config.server.ssl_cert {
|
||||||
|
println!("{}", Colour::White.paint(format!("SSL certificate: {}", cert_path)));
|
||||||
|
}
|
||||||
|
if let Some(key_path) = &self.config.server.ssl_key {
|
||||||
|
println!("{}", Colour::White.paint(format!("SSL key: {}", key_path)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.config.server.http2_enabled {
|
||||||
|
println!("{}", Colour::White.paint("HTTP/2 protocol enabled"));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn start_lua_interpreter(&mut self) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
self.lua_interpreter.run().await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
0
wal/shard_0.wal
Normal file
0
wal/shard_0.wal
Normal file
0
wal/shard_1.wal
Normal file
0
wal/shard_1.wal
Normal file
0
wal/shard_2.wal
Normal file
0
wal/shard_2.wal
Normal file
0
wal/shard_3.wal
Normal file
0
wal/shard_3.wal
Normal file
Loading…
x
Reference in New Issue
Block a user