Delete src/db.rs
This commit is contained in:
parent
be2c30a9fe
commit
867e9e5f4b
517
src/db.rs
517
src/db.rs
@ -1,517 +0,0 @@
|
||||
use ansi_term::Colour;
|
||||
use rmp_serde::{Deserializer, Serializer};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{BufReader, BufWriter, Write};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use parking_lot::RwLock;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::interval;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct Document {
|
||||
pub id: String,
|
||||
pub data: HashMap<String, serde_json::Value>,
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Index {
|
||||
primary: RwLock<BTreeMap<String, String>>, // key -> document_id
|
||||
secondary: RwLock<HashMap<String, Vec<String>>>, // field_value -> document_ids
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Transaction {
|
||||
pub operations: Vec<Operation>,
|
||||
pub committed: AtomicBool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum Operation {
|
||||
Insert { doc: Document },
|
||||
Update { doc_id: String, data: HashMap<String, serde_json::Value> },
|
||||
Delete { doc_id: String },
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Shard {
|
||||
pub data: RwLock<HashMap<String, Document>>,
|
||||
pub index: Index,
|
||||
pub wal: Option<Wal>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Wal {
|
||||
file: std::sync::Mutex<File>,
|
||||
sequence: AtomicU64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct NodeInfo {
|
||||
pub id: String,
|
||||
pub address: String,
|
||||
pub port: u16,
|
||||
pub status: NodeStatus,
|
||||
pub last_seen: u64,
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
|
||||
pub enum NodeStatus {
|
||||
Alive,
|
||||
Suspect,
|
||||
Dead,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct GossipMessage {
|
||||
pub from_node: String,
|
||||
pub nodes: Vec<NodeInfo>,
|
||||
pub sequence: u64,
|
||||
pub timestamp: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct GossipManager {
|
||||
pub node_id: String,
|
||||
pub nodes: RwLock<HashMap<String, NodeInfo>>,
|
||||
pub sequence: AtomicU64,
|
||||
pub tx: mpsc::Sender<GossipMessage>,
|
||||
pub rx: mpsc::Receiver<GossipMessage>,
|
||||
pub config: DatabaseConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FutriixDB {
|
||||
pub shards: Vec<Arc<Shard>>,
|
||||
pub config: DatabaseConfig,
|
||||
pub transactions: RwLock<HashMap<String, Transaction>>,
|
||||
pub lua_modules: RwLock<HashMap<String, String>>, // name -> lua code
|
||||
pub gossip: Arc<GossipManager>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct DatabaseConfig {
|
||||
pub data_dir: String,
|
||||
pub wal_enabled: bool,
|
||||
pub wal_dir: String,
|
||||
pub replication_enabled: bool,
|
||||
pub replication_nodes: Vec<String>,
|
||||
pub sharding_enabled: bool,
|
||||
pub shards: usize,
|
||||
pub cluster_mode: bool,
|
||||
pub gossip_port: u16,
|
||||
pub gossip_interval_ms: u64,
|
||||
pub node_id: Option<String>,
|
||||
}
|
||||
|
||||
impl FutriixDB {
|
||||
pub fn new(config: &DatabaseConfig) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
// Создаем директории если не существуют
|
||||
fs::create_dir_all(&config.data_dir)?;
|
||||
if config.wal_enabled {
|
||||
fs::create_dir_all(&config.wal_dir)?;
|
||||
}
|
||||
|
||||
let mut shards = Vec::new();
|
||||
for i in 0..config.shards {
|
||||
let shard = Arc::new(Shard::new(config, i)?);
|
||||
shards.push(shard);
|
||||
}
|
||||
|
||||
// Инициализируем GOSSIP менеджер
|
||||
let gossip = GossipManager::new(config)?;
|
||||
|
||||
Ok(Self {
|
||||
shards,
|
||||
config: config.clone(),
|
||||
transactions: RwLock::new(HashMap::new()),
|
||||
lua_modules: RwLock::new(HashMap::new()),
|
||||
gossip: Arc::new(gossip),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn insert(&self, key: &str, data: HashMap<String, serde_json::Value>) -> Result<(), String> {
|
||||
let shard = self.get_shard(key);
|
||||
let mut shard_data = shard.data.write();
|
||||
|
||||
let document = Document {
|
||||
id: key.to_string(),
|
||||
data,
|
||||
version: 0,
|
||||
};
|
||||
|
||||
if shard_data.contains_key(key) {
|
||||
return Err("Key already exists".to_string());
|
||||
}
|
||||
|
||||
// Добавляем в WAL если включен
|
||||
if let Some(wal) = &shard.wal {
|
||||
if let Err(e) = wal.write_operation(&Operation::Insert { doc: document.clone() }) {
|
||||
return Err(format!("WAL write error: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
shard_data.insert(key.to_string(), document);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get(&self, key: &str) -> Result<Document, String> {
|
||||
let shard = self.get_shard(key);
|
||||
let shard_data = shard.data.read();
|
||||
|
||||
shard_data.get(key)
|
||||
.cloned()
|
||||
.ok_or_else(|| "Key not found".to_string())
|
||||
}
|
||||
|
||||
pub fn update(&self, key: &str, data: HashMap<String, serde_json::Value>) -> Result<(), String> {
|
||||
let shard = self.get_shard(key);
|
||||
let mut shard_data = shard.data.write();
|
||||
|
||||
let doc = shard_data.get_mut(key)
|
||||
.ok_or_else(|| "Key not found".to_string())?;
|
||||
|
||||
doc.data = data;
|
||||
doc.version += 1;
|
||||
|
||||
if let Some(wal) = &shard.wal {
|
||||
wal.write_operation(&Operation::Update {
|
||||
doc_id: key.to_string(),
|
||||
data: doc.data.clone()
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn delete(&self, key: &str) -> Result<(), String> {
|
||||
let shard = self.get_shard(key);
|
||||
let mut shard_data = shard.data.write();
|
||||
|
||||
if !shard_data.contains_key(key) {
|
||||
return Err("Key not found".to_string());
|
||||
}
|
||||
|
||||
if let Some(wal) = &shard.wal {
|
||||
wal.write_operation(&Operation::Delete {
|
||||
doc_id: key.to_string()
|
||||
})?;
|
||||
}
|
||||
|
||||
shard_data.remove(key);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_shard(&self, key: &str) -> &Arc<Shard> {
|
||||
let hash = self.hash_key(key);
|
||||
&self.shards[hash % self.shards.len()]
|
||||
}
|
||||
|
||||
fn hash_key(&self, key: &str) -> usize {
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::hash::{Hash, Hasher};
|
||||
|
||||
let mut hasher = DefaultHasher::new();
|
||||
key.hash(&mut hasher);
|
||||
hasher.finish() as usize
|
||||
}
|
||||
|
||||
pub fn add_lua_module(&self, name: &str, code: &str) {
|
||||
self.lua_modules.write().insert(name.to_string(), code.to_string());
|
||||
}
|
||||
|
||||
pub async fn start_gossip(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
if self.config.cluster_mode {
|
||||
self.gossip.start().await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Команды для управления кластером
|
||||
pub fn gossip_add_node(&self, address: &str, port: u16) -> Result<(), String> {
|
||||
if !self.config.cluster_mode {
|
||||
return Err("Cluster mode is disabled".to_string());
|
||||
}
|
||||
|
||||
let node_id = format!("{}:{}", address, port);
|
||||
let node_info = NodeInfo {
|
||||
id: node_id.clone(),
|
||||
address: address.to_string(),
|
||||
port,
|
||||
status: NodeStatus::Alive,
|
||||
last_seen: chrono::Utc::now().timestamp_millis() as u64,
|
||||
version: 0,
|
||||
};
|
||||
|
||||
self.gossip.update_node(node_info);
|
||||
println!("GOSSIP: Node {}:{} added to cluster", address, port);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn gossip_remove_node(&self, node_id: &str) -> Result<(), String> {
|
||||
if !self.config.cluster_mode {
|
||||
return Err("Cluster mode is disabled".to_string());
|
||||
}
|
||||
|
||||
let mut nodes = self.gossip.nodes.write();
|
||||
if nodes.remove(node_id).is_some() {
|
||||
println!("GOSSIP: Node {} removed from cluster", node_id);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(format!("Node {} not found", node_id))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn gossip_list_nodes(&self) -> Result<Vec<NodeInfo>, String> {
|
||||
if !self.config.cluster_mode {
|
||||
return Err("Cluster mode is disabled".to_string());
|
||||
}
|
||||
|
||||
Ok(self.gossip.get_nodes())
|
||||
}
|
||||
|
||||
pub fn gossip_reconfigure(&self, new_nodes: Vec<(String, u16)>) -> Result<(), String> {
|
||||
if !self.config.cluster_mode {
|
||||
return Err("Cluster mode is disabled".to_string());
|
||||
}
|
||||
|
||||
let nodes_count = new_nodes.len();
|
||||
let mut nodes = self.gossip.nodes.write();
|
||||
nodes.clear();
|
||||
|
||||
for (address, port) in new_nodes {
|
||||
let node_id = format!("{}:{}", address, port);
|
||||
let node_info = NodeInfo {
|
||||
id: node_id.clone(),
|
||||
address: address.to_string(),
|
||||
port,
|
||||
status: NodeStatus::Alive,
|
||||
last_seen: chrono::Utc::now().timestamp_millis() as u64,
|
||||
version: 0,
|
||||
};
|
||||
nodes.insert(node_id, node_info);
|
||||
}
|
||||
|
||||
println!("GOSSIP: Cluster reconfigured with {} nodes", nodes_count);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Shard {
|
||||
fn new(config: &DatabaseConfig, index: usize) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let wal = if config.wal_enabled {
|
||||
let wal_path = PathBuf::from(&config.wal_dir).join(format!("shard_{}.wal", index));
|
||||
Some(Wal::new(wal_path)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
data: RwLock::new(HashMap::new()),
|
||||
index: Index::new(),
|
||||
wal,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Index {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
primary: RwLock::new(BTreeMap::new()),
|
||||
secondary: RwLock::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Wal {
|
||||
fn new(path: PathBuf) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let file = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(path)?;
|
||||
|
||||
Ok(Self {
|
||||
file: std::sync::Mutex::new(file),
|
||||
sequence: AtomicU64::new(0),
|
||||
})
|
||||
}
|
||||
|
||||
fn write_operation(&self, operation: &Operation) -> Result<(), String> {
|
||||
let mut buf = Vec::new();
|
||||
let mut serializer = Serializer::new(&mut buf);
|
||||
operation.serialize(&mut serializer)
|
||||
.map_err(|e| format!("Serialization error: {}", e))?;
|
||||
|
||||
let mut file = self.file.lock().unwrap();
|
||||
file.write_all(&buf)
|
||||
.map_err(|e| format!("Write error: {}", e))?;
|
||||
|
||||
file.write_all(b"\n")
|
||||
.map_err(|e| format!("Write error: {}", e))?;
|
||||
|
||||
self.sequence.fetch_add(1, Ordering::SeqCst);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl GossipManager {
|
||||
fn new(config: &DatabaseConfig) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
|
||||
let node_id = config.node_id.clone().unwrap_or_else(|| {
|
||||
format!("node-{}", Uuid::new_v4().to_string()[..8].to_string())
|
||||
});
|
||||
|
||||
println!("GOSSIP: Initializing with node ID: {}", node_id);
|
||||
println!("GOSSIP: Cluster mode: {}", config.cluster_mode);
|
||||
println!("GOSSIP: Port: {}", config.gossip_port);
|
||||
println!("GOSSIP: Interval: {}ms", config.gossip_interval_ms);
|
||||
|
||||
Ok(Self {
|
||||
node_id,
|
||||
nodes: RwLock::new(HashMap::new()),
|
||||
sequence: AtomicU64::new(0),
|
||||
tx,
|
||||
rx,
|
||||
config: config.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
if !self.config.cluster_mode {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
println!("GOSSIP: Starting gossip loop with {}ms interval", self.config.gossip_interval_ms);
|
||||
|
||||
let gossip_clone = self.clone_manager();
|
||||
tokio::spawn(async move {
|
||||
gossip_clone.run_gossip_loop().await;
|
||||
});
|
||||
|
||||
println!("Cluster: started successfully");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_gossip_loop(&self) {
|
||||
if !self.config.cluster_mode {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut interval = interval(Duration::from_millis(self.config.gossip_interval_ms));
|
||||
let mut broadcast_count = 0;
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
// Ограничиваем частоту вывода сообщений в консоль
|
||||
if broadcast_count % 10 == 0 {
|
||||
let nodes_count = self.nodes.read().len();
|
||||
if nodes_count > 0 {
|
||||
println!("GOSSIP: Broadcasting to {} nodes", nodes_count);
|
||||
}
|
||||
}
|
||||
|
||||
self.broadcast_gossip().await;
|
||||
self.cleanup_dead_nodes();
|
||||
|
||||
broadcast_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
async fn broadcast_gossip(&self) {
|
||||
let nodes = self.nodes.read().clone();
|
||||
let message = GossipMessage {
|
||||
from_node: self.node_id.clone(),
|
||||
nodes: nodes.values().cloned().collect(),
|
||||
sequence: self.sequence.fetch_add(1, Ordering::SeqCst),
|
||||
timestamp: chrono::Utc::now().timestamp_millis() as u64,
|
||||
};
|
||||
|
||||
// Здесь будет реализация рассылки сообщений другим узлам
|
||||
// через сетевые сокеты или другие транспортные механизмы
|
||||
}
|
||||
|
||||
fn cleanup_dead_nodes(&self) {
|
||||
let mut nodes = self.nodes.write();
|
||||
let now = chrono::Utc::now().timestamp_millis() as u64;
|
||||
let mut removed_count = 0;
|
||||
|
||||
nodes.retain(|_, node| {
|
||||
if node.status == NodeStatus::Dead {
|
||||
removed_count += 1;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Помечаем узлы как подозрительные если не видели их более 30 секунд
|
||||
if now - node.last_seen > 30000 {
|
||||
node.status = NodeStatus::Suspect;
|
||||
println!("GOSSIP: Node {} marked as suspect", node.id);
|
||||
}
|
||||
|
||||
// Помечаем узлы как мертвые если не видели их более 60 секунд
|
||||
if now - node.last_seen > 60000 {
|
||||
node.status = NodeStatus::Dead;
|
||||
println!("GOSSIP: Node {} marked as dead", node.id);
|
||||
removed_count += 1;
|
||||
return false;
|
||||
}
|
||||
|
||||
true
|
||||
});
|
||||
|
||||
if removed_count > 0 {
|
||||
println!("GOSSIP: Removed {} dead nodes", removed_count);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_node(&self, node_info: NodeInfo) {
|
||||
let node_id = node_info.id.clone();
|
||||
let mut nodes = self.nodes.write();
|
||||
nodes.insert(node_info.id.clone(), node_info);
|
||||
println!("GOSSIP: Node {} updated", node_id);
|
||||
}
|
||||
|
||||
pub fn get_nodes(&self) -> Vec<NodeInfo> {
|
||||
let nodes = self.nodes.read();
|
||||
nodes.values().cloned().collect()
|
||||
}
|
||||
|
||||
fn clone_manager(&self) -> Arc<Self> {
|
||||
// Создаем новый канал для клонированного менеджера
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
|
||||
Arc::new(Self {
|
||||
node_id: self.node_id.clone(),
|
||||
nodes: RwLock::new(self.nodes.read().clone()),
|
||||
sequence: AtomicU64::new(self.sequence.load(Ordering::SeqCst)),
|
||||
tx,
|
||||
rx,
|
||||
config: self.config.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Реализация ручной клонирования для GossipManager
|
||||
impl Clone for GossipManager {
|
||||
fn clone(&self) -> Self {
|
||||
// Создаем новый канал для клонированного менеджера
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
|
||||
Self {
|
||||
node_id: self.node_id.clone(),
|
||||
nodes: RwLock::new(self.nodes.read().clone()),
|
||||
sequence: AtomicU64::new(self.sequence.load(Ordering::SeqCst)),
|
||||
tx,
|
||||
rx,
|
||||
config: self.config.clone(),
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user