first commit

This commit is contained in:
Григорий Сафронов 2025-09-12 00:40:20 +03:00
commit 3260534f07
22 changed files with 6394 additions and 0 deletions

1794
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

41
Cargo.toml Normal file
View File

@ -0,0 +1,41 @@
[package]
name = "falcot"
version = "1.0.0"
edition = "2024"
[dependencies]
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rmp-serde = "1.1"
rmp = "0.8"
toml = "0.8"
rlua = "0.20.1"
crossbeam = "0.8"
dashmap = "5.0"
log = "0.4"
env_logger = "0.10"
anyhow = "1.0"
thiserror = "1.0"
uuid = { version = "1.0", features = ["v4"] }
chrono = { version = "0.4", features = ["serde"] }
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = "0.24"
rustls = "0.21"
rustls-pemfile = "1.0"
tokio-rustls = "0.24"
rcgen = "0.11" # Для генерации сертификатов
time = "0.3" # Для работы с временем в generate_certs
siphasher = "1.0.1" # Добавляем для консистентного хэширования
[[bin]]
name = "generate_certs"
path = "src/bin/generate_certs.rs"
[dev-dependencies]
tokio = { version = "1.0", features = ["full", "rt-multi-thread", "time"] }
[[test]]
name = "integration_tests"
path = "tests/integration_tests.rs"
harness = true

71
config.toml Normal file
View File

@ -0,0 +1,71 @@
# config/falcot.toml
# Конфигурация Falcot Server с wait-free архитектурой
[server]
host = "127.0.0.1"
http_port = 8080
https_port = 8443
max_connections = 10000
connection_timeout = 30
http2_enabled = true
[tls]
enabled = true
cert_path = "/falcot/certs/server.crt"
key_path = "/falcot/certs/server.key"
[replication]
enabled = true
master_nodes = [
"node1.falcot:9090",
"node2.falcot:9090",
"node3.falcot:9090"
]
sync_interval = 1000 # ms
replication_factor = 3
[acl]
enabled = false
allowed_ips = ["127.0.0.1", "192.168.1.0/24"]
denied_ips = ["10.0.0.5"]
[logging]
level = "info"
file_path = "/falcot/logs/falcot.log"
max_file_size = 10485760 # 10MB
backup_count = 5
[sharding]
enabled = true
shards = 3
replication_factor = 2
auto_balance = true
[backup]
enabled = true
interval = 3600 # 1 hour
retention = 7 # days
path = "/falcot/backups"
[security]
require_authentication = false
jwt_secret = "your-secret-key-here"
password_hashing_rounds = 12
[performance]
max_memory_mb = 1024
cache_size_mb = 512
worker_threads = 4
io_threads = 2
[monitoring]
enabled = true
prometheus_port = 9090
health_check_interval = 30
[limits]
max_documents_per_collection = 1000000
max_collections = 1000
max_indexes_per_collection = 16
request_timeout_ms = 5000
max_request_size_mb = 10

195
falcot.log Normal file
View File

@ -0,0 +1,195 @@
[2025-09-05 17:44:04] Starting Falcot server
[2025-09-05 17:44:04] Loading configuration from: config.toml
[2025-09-05 17:44:04] Database initialized with system collections
[2025-09-05 17:44:04] Server created successfully
[2025-09-05 17:44:04] Falcot Database Server started
[2025-09-05 17:44:04] Version: 0.1.0
[2025-09-05 17:44:04] HTTP server started on 127.0.0.1:8082
[2025-09-05 17:44:04] Starting Lua interpreter...
[2025-09-05 17:44:14] Falcot server stopped
[2025-09-05 17:44:23] Starting Falcot server
[2025-09-05 17:44:23] Loading configuration from: config.toml
[2025-09-05 17:44:23] Database initialized with system collections
[2025-09-05 17:44:23] Server created successfully
[2025-09-05 17:44:23] Falcot Database Server started
[2025-09-05 17:44:23] Version: 0.1.0
[2025-09-05 17:44:23] HTTP server started on 127.0.0.1:8082
[2025-09-05 17:44:23] Starting Lua interpreter...
[2025-09-05 17:44:56] Falcot server stopped
[2025-09-05 17:45:07] Starting Falcot server
[2025-09-05 17:45:07] Loading configuration from: config.toml
[2025-09-05 17:45:07] Database initialized with system collections
[2025-09-05 17:45:07] Server created successfully
[2025-09-05 17:45:07] Falcot Database Server started
[2025-09-05 17:45:07] Version: 0.1.0
[2025-09-05 17:45:07] HTTP server started on 127.0.0.1:8082
[2025-09-05 17:45:07] Starting Lua interpreter...
[2025-09-05 17:45:25] Falcot server stopped
[2025-09-05 17:45:42] Starting Falcot server
[2025-09-05 17:45:42] Loading configuration from: config.toml
[2025-09-05 17:45:42] Database initialized with system collections
[2025-09-05 17:45:42] Server created successfully
[2025-09-05 17:45:42] Falcot Database Server started
[2025-09-05 17:45:42] Version: 0.1.0
[2025-09-05 17:45:42] HTTP server started on 127.0.0.1:8082
[2025-09-05 17:45:42] Starting Lua interpreter...
[2025-09-05 17:59:02] Starting Falcot server
[2025-09-05 17:59:02] Loading configuration from: config.toml
[2025-09-05 17:59:02] Database initialized with system collections
[2025-09-05 17:59:02] Server created successfully
[2025-09-05 17:59:02] Falcot Database Server started
[2025-09-05 17:59:02] Version: 0.1.0
[2025-09-05 17:59:02] HTTP server started on 127.0.0.1:8082
[2025-09-05 17:59:02] Starting Lua interpreter...
[2025-09-05 20:19:00] Starting Falcot server
[2025-09-05 20:19:00] Loading configuration from: config.toml
[2025-09-05 20:19:00] Database initialized with system collections
[2025-09-05 20:19:00] Server created successfully
[2025-09-05 20:19:00] Falcot Database Server started
[2025-09-05 20:19:00] Version: 0.1.0
[2025-09-05 20:19:00] HTTP server started on 127.0.0.1:8082
[2025-09-05 20:19:00] Starting Lua interpreter...
[2025-09-05 20:19:14] Falcot server stopped
[2025-09-05 20:29:25] Starting Falcot server
[2025-09-05 20:29:25] Loading configuration from: config.toml
[2025-09-05 20:29:25] Database initialized with system collections
[2025-09-05 20:29:25] Server created successfully
[2025-09-05 20:29:25] Falcot Database Server started
[2025-09-05 20:29:25] Version: 0.1.0
[2025-09-05 20:29:25] HTTP server started on 127.0.0.1:8082
[2025-09-05 20:29:25] Starting Lua interpreter...
[2025-09-05 20:29:55] Starting Falcot server
[2025-09-05 20:29:55] Loading configuration from: config.toml
[2025-09-05 20:29:55] Database initialized with system collections
[2025-09-05 20:29:55] Server created successfully
[2025-09-05 20:29:55] Falcot Database Server started
[2025-09-05 20:29:55] Version: 0.1.0
[2025-09-05 20:29:55] HTTP server started on 127.0.0.1:8082
[2025-09-05 20:29:55] Starting Lua interpreter...
[2025-09-05 21:20:05] Starting Falcot server
[2025-09-05 21:20:05] Loading configuration from: config.toml
[2025-09-05 21:20:05] Database initialized with system collections
[2025-09-05 21:20:05] Server created successfully
[2025-09-05 21:20:05] Falcot Database Server started
[2025-09-05 21:20:05] Version: 0.1.0
[2025-09-05 21:29:07] Starting Falcot server
[2025-09-05 21:29:07] Loading configuration from: config.toml
[2025-09-05 21:29:07] Database initialized with system collections
[2025-09-05 21:29:07] Server created successfully
[2025-09-05 21:29:07] Falcot Database Server started
[2025-09-05 21:29:07] Version: 0.1.0
[2025-09-06 16:29:19] Starting Falcot server
[2025-09-06 16:29:19] Loading configuration from: config.toml
[2025-09-06 16:29:19] Database initialized with system collections
[2025-09-06 16:29:19] Server created successfully
[2025-09-06 16:29:19] Falcot Database Server started
[2025-09-06 16:29:19] Version: 0.1.0
[2025-09-06 17:06:56] Starting Falcot server
[2025-09-06 17:06:56] Loading configuration from: config.toml
[2025-09-06 17:06:56] Database initialized with system collections
[2025-09-06 17:06:56] Server created successfully
[2025-09-06 17:06:56] Falcot Database Server started
[2025-09-06 17:06:56] Version: 0.1.0
[2025-09-06 17:07:04] Starting Falcot server
[2025-09-06 17:07:04] Loading configuration from: config.toml
[2025-09-06 17:07:04] Database initialized with system collections
[2025-09-06 17:07:04] Server created successfully
[2025-09-06 17:07:04] Falcot Database Server started
[2025-09-06 17:07:04] Version: 0.1.0
[2025-09-06 17:30:20] Starting Falcot server
[2025-09-06 17:30:20] Loading configuration from: config.toml
[2025-09-06 17:30:20] Database initialized with system collections
[2025-09-06 17:30:20] Server created successfully
[2025-09-06 17:30:20] Falcot Database Server started
[2025-09-06 17:30:20] Version: 0.1.0
[2025-09-06 17:30:20] Starting Lua interpreter...
[2025-09-06 17:41:32] Starting Falcot server
[2025-09-06 17:41:32] Loading configuration from: config.toml
[2025-09-06 17:41:32] Database initialized with system collections
[2025-09-06 17:41:32] Server created successfully
[2025-09-06 17:41:32] Falcot Database Server started
[2025-09-06 17:41:32] Version: 0.1.0
[2025-09-06 17:41:32] Starting Lua interpreter...
[2025-09-06 17:41:40] Falcot server stopped
[2025-09-07 17:42:42] Starting Falcot server
[2025-09-07 17:42:42] Loading configuration from: config.toml
[2025-09-07 17:42:42] Database initialized with system collections
[2025-09-07 17:42:42] Server created successfully
[2025-09-07 17:42:42] Falcot Database Server started
[2025-09-07 17:42:42] Version: 0.1.0
[2025-09-07 17:42:42] Starting Lua interpreter...
[2025-09-07 17:42:42] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-09-07 17:42:47] Falcot server stopped
[2025-09-07 19:25:29] Starting Falcot server
[2025-09-07 19:25:29] Loading configuration from: config.toml
[2025-09-07 19:25:29] Database initialized with system collections
[2025-09-07 19:25:29] Server created successfully
[2025-09-07 19:25:29] Falcot Database Server started
[2025-09-07 19:25:29] Version: 0.1.0
[2025-09-07 19:25:29] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-09-07 19:25:29] Starting Lua interpreter...
[2025-09-07 19:34:43] Starting Falcot server
[2025-09-07 19:34:43] Loading configuration from: config.toml
[2025-09-07 19:34:43] Database initialized with system collections
[2025-09-07 19:34:43] Server created successfully
[2025-09-07 19:34:43] Falcot Database Server started
[2025-09-07 19:34:43] Version: 0.1.0
[2025-09-07 19:34:43] Starting Lua interpreter...
[2025-09-07 19:34:43] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-09-07 19:35:59] Starting Falcot server
[2025-09-07 19:35:59] Loading configuration from: config.toml
[2025-09-07 19:35:59] Database initialized with system collections
[2025-09-07 19:35:59] Server created successfully
[2025-09-07 19:35:59] Falcot Database Server started
[2025-09-07 19:35:59] Version: 0.1.0
[2025-09-07 19:35:59] HTTPS server started on 127.0.0.1:8443
[2025-09-07 19:35:59] Starting Lua interpreter...
[2025-09-07 19:36:29] Falcot server stopped
[2025-09-07 19:36:31] Starting Falcot server
[2025-09-07 19:36:31] Loading configuration from: config.toml
[2025-09-07 19:36:31] Database initialized with system collections
[2025-09-07 19:36:31] Server created successfully
[2025-09-07 19:36:31] Falcot Database Server started
[2025-09-07 19:36:31] Version: 0.1.0
[2025-09-07 19:36:31] Starting Lua interpreter...
[2025-09-07 19:36:31] HTTPS server started on 127.0.0.1:8443
[2025-09-07 21:39:52] Starting Falcot server
[2025-09-07 21:39:52] Loading configuration from: config.toml
[2025-09-07 21:39:52] Database initialized with system collections
[2025-09-07 21:39:52] Server created successfully
[2025-09-07 21:39:52] Falcot Database Server started
[2025-09-07 21:39:52] Version: 0.1.0
[2025-09-07 21:39:52] Starting Lua interpreter...
[2025-09-07 21:39:52] HTTPS server started on 127.0.0.1:8443
[2025-09-07 21:42:41] Starting Falcot server
[2025-09-07 21:42:41] Loading configuration from: config.toml
[2025-09-07 21:42:41] Database initialized with system collections
[2025-09-07 21:42:41] Server created successfully
[2025-09-07 21:42:41] Falcot Database Server started
[2025-09-07 21:42:41] Version: 0.1.0
[2025-09-07 21:42:41] Starting Lua interpreter...
[2025-09-07 21:43:03] Falcot server stopped
[2025-09-08 10:49:32] Starting Falcot server
[2025-09-08 10:49:32] Loading configuration from: config.toml
[2025-09-08 10:49:32] Database initialized with system collections
[2025-09-08 10:49:32] Server created successfully
[2025-09-08 10:49:32] Falcot Database Server started
[2025-09-08 10:49:32] Version: 0.1.0
[2025-09-08 10:49:32] Starting Lua interpreter...
[2025-09-08 10:49:42] Falcot server stopped
[2025-09-08 21:55:40] Starting Falcot server
[2025-09-08 21:55:40] Loading configuration from: config.toml
[2025-09-08 21:55:40] Database initialized with system collections
[2025-09-08 21:55:40] Server created successfully
[2025-09-08 21:55:40] Falcot Database Server started
[2025-09-08 21:55:40] Version: 1.0.0
[2025-09-08 21:55:40] Starting Lua interpreter...
[2025-09-08 21:55:42] Falcot server stopped
[2025-09-11 14:45:10] Starting Falcot server
[2025-09-11 14:45:10] Loading configuration from: config.toml
[2025-09-11 14:45:10] Database initialized with system collections
[2025-09-11 14:45:10] Server created successfully
[2025-09-11 14:45:10] Falcot Database Server started
[2025-09-11 14:45:10] Version: 1.0.0
[2025-09-11 14:45:10] Starting Lua interpreter...
[2025-09-11 14:45:47] Falcot server stopped

27
lua_scripts/init.lua Normal file
View File

@ -0,0 +1,27 @@
-- lua_scripts/init.lua
-- Инициализационный Lua скрипт для Falcot Server
falcot_log("Initializing Falcot Server v1.0.0 with Lua scripting...")
-- Создаем глобальные функции для бэкапов
function falcot.engine.backup.start()
return falcot_db.backup_start()
end
function falcot.engine.backup.restore(backup_path)
return falcot_db.backup_restore(backup_path)
end
-- Пример создания коллекции при старте
falcot_db.create("system_config", '{"key": "server_start_time", "value": "' .. os.date() .. '"}')
falcot_log("System configuration initialized")
-- Пример ACL проверки
function check_access(ip_address)
if ip_address == "127.0.0.1" then
return true
end
return false
end
falcot_log("Lua initialization script completed")

4
scripts/example.lua Normal file
View File

@ -0,0 +1,4 @@
-- Пример Lua скрипта для Falcot
falcot_log("Starting example script")
print ("Hello World!")

37
src/bin/generate_certs.rs Normal file
View File

@ -0,0 +1,37 @@
// src/bin/generate_certs.rs
use std::fs;
use std::io::Write;
use rcgen::{Certificate, CertificateParams};
use time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Создаем директорию для сертификатов, если её нет
fs::create_dir_all("certs")?;
// Генерируем параметры сертификата
let mut params = CertificateParams::default();
params.not_before = time::OffsetDateTime::now_utc();
params.not_after = time::OffsetDateTime::now_utc() + Duration::days(365); // 1 год
params.distinguished_name = rcgen::DistinguishedName::new();
params.distinguished_name.push(rcgen::DnType::CommonName, "localhost");
params.distinguished_name.push(rcgen::DnType::OrganizationName, "Falcot Server");
// Генерируем сертификат
let cert = Certificate::from_params(params)?;
// Сохраняем сертификат
let cert_pem = cert.serialize_pem()?;
let mut cert_file = fs::File::create("certs/cert.pem")?;
cert_file.write_all(cert_pem.as_bytes())?;
// Сохраняем приватный ключ
let key_pem = cert.serialize_private_key_pem();
let mut key_file = fs::File::create("certs/key.pem")?;
key_file.write_all(key_pem.as_bytes())?;
println!("✅ Сертификаты успешно сгенерированы в папке certs/");
println!("📄 Сертификат: certs/cert.pem");
println!("🔑 Приватный ключ: certs/key.pem");
Ok(())
}

62
src/client.rs Normal file
View File

@ -0,0 +1,62 @@
// client.rs
//! Клиент для подключения к серверу Falcot
//!
//! Обеспечивает взаимодействие с сервером через TCP соединение
//! с использованием MessagePack для сериализации сообщений.
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::common::error::Result;
/// Клиент Falcot
#[allow(dead_code)]
pub struct FalcotClient {
address: String,
}
#[allow(dead_code)]
impl FalcotClient {
pub fn new(address: &str) -> Self {
Self {
address: address.to_string(),
}
}
/// Подключение к серверу и выполнение команды
pub async fn execute_command(&self, command: crate::common::protocol::Command) -> Result<crate::common::protocol::Response> {
let mut socket = tokio::net::TcpStream::connect(&self.address).await
.map_err(|e| crate::common::error::FalcotError::NetworkError(e.to_string()))?;
let (mut reader, mut writer) = socket.split();
// Отправка команды
self.send_message(&mut writer, &command).await?;
// Получение ответа
let response: crate::common::protocol::Response = self.receive_message(&mut reader).await?;
Ok(response)
}
async fn send_message<T: serde::Serialize>(
&self,
writer: &mut (impl AsyncWriteExt + Unpin),
message: &T,
) -> Result<()> {
let bytes = crate::common::protocol::serialize(message)?;
writer.write_all(&bytes).await
.map_err(|e| crate::common::error::FalcotError::NetworkError(e.to_string()))?;
Ok(())
}
async fn receive_message<T: for<'a> serde::Deserialize<'a>>(
&self,
reader: &mut (impl AsyncReadExt + Unpin),
) -> Result<T> {
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await
.map_err(|e| crate::common::error::FalcotError::NetworkError(e.to_string()))?;
crate::common::protocol::deserialize(&buffer)
}
}

48
src/common/error.rs Normal file
View File

@ -0,0 +1,48 @@
// src/common/error.rs
//! Система обработки ошибок для Falcot
//!
//! Определяет типы ошибок и их обработку для всех компонентов системы
//! с wait-free архитектурой.
use thiserror::Error;
/// Основной тип ошибки для Falcot
#[derive(Error, Debug)]
pub enum FalcotError {
#[error("Configuration error: {0}")]
ConfigError(String),
#[error("Database error: {0}")]
DatabaseError(String),
#[error("Lua error: {0}")]
LuaError(String),
#[error("Network error: {0}")]
NetworkError(String),
#[error("Replication error: {0}")]
ReplicationError(String),
#[error("HTTP error: {0}")]
HttpError(String),
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
#[error("Unknown error: {0}")]
Unknown(String),
}
// Реализация преобразования из rlua::Error в FalcotError
impl From<rlua::Error> for FalcotError {
fn from(error: rlua::Error) -> Self {
FalcotError::LuaError(error.to_string())
}
}
/// Тип результата для Falcot
pub type Result<T> = std::result::Result<T, FalcotError>;

8
src/common/mod.rs Normal file
View File

@ -0,0 +1,8 @@
// common/mod.rs
//! Общие модули для Falcot
//!
//! Содержит общие структуры данных, ошибки и протоколы,
//! используемые во всех компонентах системы с wait-free архитектурой.
pub mod error;
pub mod protocol;

154
src/common/protocol.rs Normal file
View File

@ -0,0 +1,154 @@
// src/common/protocol.rs
//! Протокол обмена данными для Falcot
//!
//! Определяет структуры команд и ответов для взаимодействия между
//! компонентами системы с использованием wait-free сериализации.
#![allow(dead_code)]
use serde::{Deserialize, Serialize};
use crate::server::database::Index;
/// Команды для выполнения в базе данных
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Command {
Create {
collection: String,
document: Vec<u8>,
},
Read {
collection: String,
id: String,
},
Update {
collection: String,
id: String,
document: Vec<u8>,
},
Delete {
collection: String,
id: String,
},
Query {
collection: String,
filter: Vec<u8>,
},
CreateProcedure {
name: String,
code: Vec<u8>,
},
CallProcedure {
name: String,
},
BeginTransaction {
transaction_id: String,
},
CommitTransaction {
transaction_id: String,
},
RollbackTransaction {
transaction_id: String,
},
CreateIndex {
collection: String,
index: Index,
},
QueryByIndex {
collection: String,
index_name: String,
value: Vec<u8>,
},
// Новые команды для шардинга
AddShardNode {
node_id: String,
address: String,
capacity: u64,
},
RemoveShardNode {
node_id: String,
},
MigrateShard {
collection: String,
from_node: String,
to_node: String,
shard_key: String,
},
RebalanceCluster,
GetClusterStatus,
// Команды для constraints
AddConstraint {
collection: String,
constraint_name: String,
constraint_type: String,
field: String,
value: Vec<u8>,
},
RemoveConstraint {
collection: String,
constraint_name: String,
},
// Команды для компрессии
EnableCompression {
collection: String,
algorithm: String,
},
DisableCompression {
collection: String,
},
// Команды для глобальных индексов
CreateGlobalIndex {
name: String,
field: String,
unique: bool,
},
QueryGlobalIndex {
index_name: String,
value: Vec<u8>,
},
}
/// Ответы от базы данных
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Response {
Success(Vec<u8>),
Error(String),
}
/// Сообщение для репликации
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationMessage {
pub sequence: u64,
pub command: Command,
pub timestamp: i64,
}
/// Структура для информации о шарде
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShardInfo {
pub node_id: String,
pub address: String,
pub capacity: u64,
pub used: u64,
pub collections: Vec<String>,
}
/// Структура для статуса кластера
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterStatus {
pub nodes: Vec<ShardInfo>,
pub total_capacity: u64,
pub total_used: u64,
pub rebalance_needed: bool,
}
/// Wait-Free сериализация сообщений
pub fn serialize<T: serde::Serialize>(value: &T) -> crate::common::error::Result<Vec<u8>> {
rmp_serde::to_vec(value)
.map_err(|e| crate::common::error::FalcotError::SerializationError(e.to_string()))
}
/// Wait-Free десериализация сообщений
pub fn deserialize<'a, T: serde::Deserialize<'a>>(bytes: &'a [u8]) -> crate::common::error::Result<T> {
rmp_serde::from_slice(bytes)
.map_err(|e| crate::common::error::FalcotError::SerializationError(e.to_string()))
}

1091
src/lua_shell.rs Normal file

File diff suppressed because it is too large Load Diff

133
src/main.rs Normal file
View File

@ -0,0 +1,133 @@
// src/main.rs
//! Главный модуль сервера Falcot
//!
//! Точка входа в приложение, инициализирует сервер и запускает его.
//! Использует wait-free архитектуру с lock-free структурами данных.
mod common;
mod server;
mod client;
mod lua_shell;
use std::env;
use std::fs::OpenOptions;
use std::io::Write;
use crate::common::error::FalcotError;
/// Функция для логирования в файл
fn log_to_file(message: &str) {
match OpenOptions::new()
.create(true)
.append(true)
.open("falcot.log")
{
Ok(mut file) => {
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
let log_message = format!("[{}] {}\n", timestamp, message);
let _ = file.write_all(log_message.as_bytes());
}
Err(e) => eprintln!("Failed to write to log file: {}", e),
}
}
/// Простая структура для аргументов командной строки
struct Args {
config: String,
debug: bool,
http_port: Option<u16>,
https_port: Option<u16>,
host: Option<String>,
}
/// Простой парсер аргументов командной строки
fn parse_args() -> Args {
let mut args = Args {
config: "config.toml".to_string(),
debug: false,
http_port: None,
https_port: None,
host: None,
};
let mut iter = env::args().skip(1);
while let Some(arg) = iter.next() {
match arg.as_str() {
"--config" | "-c" => {
if let Some(value) = iter.next() {
args.config = value;
}
}
"--debug" | "-d" => {
args.debug = true;
}
"--http-port" => {
if let Some(value) = iter.next() {
if let Ok(port) = value.parse() {
args.http_port = Some(port);
}
}
}
"--https-port" => {
if let Some(value) = iter.next() {
if let Ok(port) = value.parse() {
args.https_port = Some(port);
}
}
}
"--host" => {
if let Some(value) = iter.next() {
args.host = Some(value);
}
}
_ => {
if arg.starts_with("--config=") {
args.config = arg.trim_start_matches("--config=").to_string();
} else if arg.starts_with("-c=") {
args.config = arg.trim_start_matches("-c=").to_string();
}
}
}
}
args
}
#[tokio::main]
async fn main() -> Result<(), FalcotError> {
// Инициализация логирования в файл
log_to_file("Starting Falcot server");
// Парсим аргументы командной строки
let args = parse_args();
let config_path = args.config;
// Добавляем пустую строкю перед загрузкой конфигурации
println!();
let message = format!("Loading configuration from: {}", config_path);
println!("{}", message);
log_to_file(&message);
// Создание и запуск сервера
match server::FalcotServer::new(&config_path).await {
Ok(server) => {
log_to_file("Server created successfully");
if let Err(e) = server.run().await {
let error_message = format!("Server error: {}", e);
eprintln!("{}", error_message);
log_to_file(&error_message);
std::process::exit(1);
}
}
Err(e) => {
let error_message = format!("Failed to create server: {}", e);
eprintln!("{}", error_message);
log_to_file(&error_message);
std::process::exit(1);
}
}
log_to_file("Falcot server stopped");
Ok(())
}

239
src/server/config.rs Normal file
View File

@ -0,0 +1,239 @@
// src/server/config.rs
//! Конфигурация сервера Falcot
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::Path;
/// Конфигурация сервера
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Config {
#[serde(default = "ServerConfig::default")]
pub server: ServerConfig,
#[serde(default = "ReplicationConfig::default")]
pub replication: ReplicationConfig,
#[serde(default = "LuaConfig::default")]
pub lua: LuaConfig,
#[serde(default = "AclConfig::default")]
pub acl: AclConfig,
#[serde(default = "TlsConfig::default")]
pub tls: TlsConfig,
}
/// Конфигурация серверных параметров
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ServerConfig {
#[serde(default = "default_host")]
pub host: String,
#[serde(default = "default_port")]
pub port: u16,
#[serde(default)]
pub http_port: Option<u16>,
#[serde(default)]
pub https_port: Option<u16>,
#[serde(default)]
pub http2_enabled: Option<bool>,
}
/// Конфигурация репликации
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ReplicationConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_master_nodes")]
pub master_nodes: Vec<String>,
#[serde(default = "default_sync_interval")]
pub sync_interval: u64,
}
/// Конфигурация Lua
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LuaConfig {
#[serde(default = "default_scripts_dir")]
pub scripts_dir: String,
#[serde(default)]
pub auto_execute: Vec<String>,
}
/// Конфигурация ACL
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AclConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_allowed_ips")]
pub allowed_ips: Vec<String>,
#[serde(default)]
pub denied_ips: Vec<String>,
}
/// Конфигурация TLS
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TlsConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_cert_path")]
pub cert_path: String,
#[serde(default = "default_key_path")]
pub key_path: String,
}
// Функции для значений по умолчанию
fn default_host() -> String {
"127.0.0.1".to_string()
}
fn default_port() -> u16 {
8081
}
fn default_master_nodes() -> Vec<String> {
vec!["127.0.0.1:8081".to_string(), "127.0.0.1:8083".to_string()]
}
fn default_sync_interval() -> u64 {
5000
}
fn default_scripts_dir() -> String {
"lua_scripts".to_string()
}
fn default_allowed_ips() -> Vec<String> {
vec!["127.0.0.1".to_string(), "::1".to_string()]
}
fn default_cert_path() -> String {
"certs/cert.pem".to_string()
}
fn default_key_path() -> String {
"certs/key.pem".to_string()
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
host: default_host(),
port: default_port(),
http_port: Some(8082),
https_port: None,
http2_enabled: Some(false),
}
}
}
impl Default for ReplicationConfig {
fn default() -> Self {
Self {
enabled: false,
master_nodes: default_master_nodes(),
sync_interval: default_sync_interval(),
}
}
}
impl Default for LuaConfig {
fn default() -> Self {
Self {
scripts_dir: default_scripts_dir(),
auto_execute: vec!["init.lua".to_string()],
}
}
}
impl Default for AclConfig {
fn default() -> Self {
Self {
enabled: false,
allowed_ips: default_allowed_ips(),
denied_ips: vec![],
}
}
}
impl Default for TlsConfig {
fn default() -> Self {
Self {
enabled: false,
cert_path: default_cert_path(),
key_path: default_key_path(),
}
}
}
impl Default for Config {
fn default() -> Self {
Self {
server: ServerConfig::default(),
replication: ReplicationConfig::default(),
lua: LuaConfig::default(),
acl: AclConfig::default(),
tls: TlsConfig::default(),
}
}
}
impl Config {
/// Загрузка конфигурации из файла
pub fn load(path: &str) -> Result<Self, crate::common::error::FalcotError> {
let path = Path::new(path);
if !path.exists() {
// Создание конфигурации по умолчанию
let default_config = Config::default();
let toml_content = toml::to_string_pretty(&default_config)
.map_err(|e| crate::common::error::FalcotError::ConfigError(e.to_string()))?;
fs::write(path, toml_content)
.map_err(|e| crate::common::error::FalcotError::ConfigError(e.to_string()))?;
println!("Created default configuration file: {}", path.display());
return Ok(default_config);
}
let content = fs::read_to_string(path)
.map_err(|e| crate::common::error::FalcotError::ConfigError(e.to_string()))?;
// Парсим конфигурацию с использованием значений по умолчанию
let mut config: Config = toml::from_str(&content)
.map_err(|e| crate::common::error::FalcotError::ConfigError(e.to_string()))?;
// Убеждаемся, что все поля имеют значения по умолчанию, если они отсутствуют
if config.server.host.is_empty() {
config.server.host = default_host();
}
if config.server.port == 0 {
config.server.port = default_port();
}
if config.replication.master_nodes.is_empty() {
config.replication.master_nodes = default_master_nodes();
}
if config.replication.sync_interval == 0 {
config.replication.sync_interval = default_sync_interval();
}
if config.lua.scripts_dir.is_empty() {
config.lua.scripts_dir = default_scripts_dir();
}
if config.acl.allowed_ips.is_empty() {
config.acl.allowed_ips = default_allowed_ips();
}
if config.tls.cert_path.is_empty() {
config.tls.cert_path = default_cert_path();
}
if config.tls.key_path.is_empty() {
config.tls.key_path = default_key_path();
}
Ok(config)
}
/// Сохранение конфигурации в файл
#[allow(dead_code)]
pub fn save(&self, path: &str) -> Result<(), crate::common::error::FalcotError> {
let toml_content = toml::to_string_pretty(self)
.map_err(|e| crate::common::error::FalcotError::ConfigError(e.to_string()))?;
fs::write(path, toml_content)
.map_err(|e| crate::common::error::FalcotError::ConfigError(e.to_string()))
}
}

669
src/server/database.rs Normal file
View File

@ -0,0 +1,669 @@
// src/server/database.rs
//! Wait-Free документо-ориентированная база данных
//!
//! Реализует wait-free доступ к данным с использованием атомарных
//! ссылок и lock-free структур данных для максимальной производительности.
#![allow(unused_imports)]
#![allow(dead_code)]
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
use serde::{Serialize, Deserialize};
use serde_json::Value;
use uuid::Uuid;
use dashmap::DashMap;
use crate::common::error::Result;
use crate::common::protocol;
/// Триггеры для коллекций
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum TriggerEvent {
BeforeCreate,
AfterCreate,
BeforeUpdate,
AfterUpdate,
BeforeDelete,
AfterDelete,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Trigger {
pub name: String,
pub event: TriggerEvent,
pub collection: String,
pub lua_code: String,
}
/// Типы индексов
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum IndexType {
Primary,
Secondary,
}
/// Структура индекса
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Index {
pub name: String,
pub index_type: IndexType,
pub field: String,
pub unique: bool,
}
/// Wait-Free коллекция документов
#[derive(Clone)]
pub struct Collection {
name: String,
documents: Arc<RwLock<HashMap<String, Vec<u8>>>>,
sequence: Arc<AtomicU64>,
triggers: Arc<RwLock<Vec<Trigger>>>,
indexes: Arc<RwLock<HashMap<String, Index>>>,
index_data: Arc<DashMap<String, HashMap<String, Vec<String>>>>,
}
impl Collection {
/// Создание новой wait-free коллекции
pub fn new(name: String) -> Self {
Self {
name,
documents: Arc::new(RwLock::new(HashMap::new())),
sequence: Arc::new(AtomicU64::new(0)),
triggers: Arc::new(RwLock::new(Vec::new())),
indexes: Arc::new(RwLock::new(HashMap::new())),
index_data: Arc::new(DashMap::new()),
}
}
/// Добавление триггера
pub fn add_trigger(&self, trigger: Trigger) -> Result<()> {
let mut triggers = self.triggers.write()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
triggers.push(trigger);
Ok(())
}
/// Получение триггеров для события
#[allow(dead_code)]
pub fn get_triggers_for_event(&self, event: TriggerEvent) -> Result<Vec<Trigger>> {
let triggers = self.triggers.read()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
Ok(triggers.iter()
.filter(|t| t.event == event)
.cloned()
.collect())
}
/// Создание индекса
pub fn create_index(&self, index: Index) -> Result<()> {
let mut indexes = self.indexes.write()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
if indexes.contains_key(&index.name) {
return Err(crate::common::error::FalcotError::DatabaseError(
format!("Index already exists: {}", index.name)
));
}
// Создаем структуру для хранения данных индекса
self.index_data.insert(index.name.clone(), HashMap::new());
let index_clone = index.clone();
indexes.insert(index.name.clone(), index);
// Перестраиваем индекс для существующих документов
self.rebuild_index(&index_clone.name)?;
Ok(())
}
/// Перестроение индекса
fn rebuild_index(&self, index_name: &str) -> Result<()> {
let indexes = self.indexes.read()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let index = indexes.get(index_name)
.ok_or_else(|| crate::common::error::FalcotError::DatabaseError(
format!("Index not found: {}", index_name)
))?;
let documents = self.documents.read()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let mut index_map = HashMap::new();
for (id, document_bytes) in documents.iter() {
if let Ok(document) = serde_json::from_slice::<Value>(document_bytes) {
if let Some(field_value) = document.get(&index.field) {
// Конвертируем значение в строку для использования в HashMap
let value_str = field_value.to_string();
let entry = index_map.entry(value_str).or_insert_with(Vec::new);
entry.push(id.clone());
// Проверка уникальности для уникальных индексов
if index.unique && entry.len() > 1 {
return Err(crate::common::error::FalcotError::DatabaseError(
format!("Duplicate value {} for unique index {}", field_value, index_name)
));
}
}
}
}
self.index_data.insert(index_name.to_string(), index_map);
Ok(())
}
/// Обновление индекса при изменении документа
fn update_indexes(&self, old_document: Option<&[u8]>, new_document: &[u8], document_id: &str) -> Result<()> {
let indexes = self.indexes.read()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let new_doc_value: Value = serde_json::from_slice(new_document)
.map_err(|e| crate::common::error::FalcotError::DatabaseError(e.to_string()))?;
let old_doc_value: Option<Value> = old_document
.and_then(|doc| serde_json::from_slice(doc).ok());
for (index_name, index) in indexes.iter() {
if let Some(mut index_map) = self.index_data.get_mut(index_name) {
// Удаляем старые значения из индекса
if let Some(old_doc) = &old_doc_value {
if let Some(old_value) = old_doc.get(&index.field) {
let old_value_str = old_value.to_string();
if let Some(entries) = index_map.get_mut(&old_value_str) {
entries.retain(|id| id != document_id);
if entries.is_empty() {
index_map.remove(&old_value_str);
}
}
}
}
// Добавляем новые значения в индекс
if let Some(new_value) = new_doc_value.get(&index.field) {
let new_value_str = new_value.to_string();
let entries = index_map.entry(new_value_str).or_insert_with(Vec::new);
// Проверка уникальности
if index.unique && !entries.is_empty() && entries[0] != document_id {
return Err(crate::common::error::FalcotError::DatabaseError(
format!("Duplicate value {} for unique index {}", new_value, index_name)
));
}
if !entries.contains(&document_id.to_string()) {
entries.push(document_id.to_string());
}
}
}
}
Ok(())
}
/// Поиск по индексу
pub fn query_by_index(&self, index_name: &str, value: &Value) -> Result<Vec<String>> {
let index_map = self.index_data.get(index_name)
.ok_or_else(|| crate::common::error::FalcotError::DatabaseError(
format!("Index not found: {}", index_name)
))?;
let value_str = value.to_string();
Ok(index_map.get(&value_str).cloned().unwrap_or_default())
}
/// Wait-Free создание документа
pub fn create_document(&self, document: Vec<u8>) -> Result<String> {
let id = Uuid::new_v4().to_string();
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
let mut documents = self.documents.write()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
// Проверяем уникальность перед вставкой
self.update_indexes(None, &document, &id)?;
documents.insert(id.clone(), document);
println!("Document created in collection '{}' with ID: {} (seq: {})", self.name, id, seq);
Ok(id)
}
/// Wait-Free чтение документа
pub fn read_document(&self, id: &str) -> Result<Option<Vec<u8>>> {
let documents = self.documents.read()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
Ok(documents.get(id).cloned())
}
/// Wait-Free обновление документа
pub fn update_document(&self, id: &str, document: Vec<u8>) -> Result<()> {
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
let mut documents = self.documents.write()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
if let Some(old_document) = documents.get(id) {
// Обновляем индексы
self.update_indexes(Some(old_document), &document, id)?;
documents.insert(id.to_string(), document);
println!("Document updated in collection '{}': {} (seq: {})", self.name, id, seq);
Ok(())
} else {
Err(crate::common::error::FalcotError::DatabaseError(
format!("Document not found: {}", id)
))
}
}
/// Wait-Free удаление документа
pub fn delete_document(&self, id: &str) -> Result<()> {
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
let mut documents = self.documents.write()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
if let Some(old_document) = documents.get(id) {
// Удаляем из индексов
self.update_indexes(Some(old_document), &[], id)?;
documents.remove(id);
println!("Document deleted from collection '{}': {} (seq: {})", self.name, id, seq);
Ok(())
} else {
Err(crate::common::error::FalcotError::DatabaseError(
format!("Document not found: {}", id)
))
}
}
/// Wait-Free запрос документов
pub fn query_documents(&self, _filter: Vec<u8>) -> Result<Vec<Vec<u8>>> {
let documents = self.documents.read()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
// TODO: Реализовать wait-free фильтрацию на основе filter
let documents: Vec<Vec<u8>> = documents.values().cloned().collect();
Ok(documents)
}
/// Получение имени коллекции (wait-free)
#[allow(dead_code)]
pub fn get_name(&self) -> &str {
&self.name
}
/// Получение количества документов (wait-free)
#[allow(dead_code)]
pub fn count_documents(&self) -> Result<usize> {
let documents = self.documents.read()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
Ok(documents.len())
}
}
/// Wait-Free база данных
#[derive(Clone)]
pub struct Database {
collections: Arc<RwLock<HashMap<String, Collection>>>,
procedures: Arc<RwLock<HashMap<String, Vec<u8>>>>,
transactions: Arc<RwLock<HashMap<String, Vec<protocol::Command>>>>,
}
impl Database {
/// Создание новой wait-free базы данных
pub fn new() -> Self {
Self {
collections: Arc::new(RwLock::new(HashMap::new())),
procedures: Arc::new(RwLock::new(HashMap::new())),
transactions: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Wait-Free получение или создание коллекции
pub fn get_collection(&self, name: &str) -> Collection {
let mut collections = self.collections.write().unwrap();
if let Some(collection) = collections.get(name) {
return collection.clone();
}
// Создаем новую коллекцию wait-free способом
let new_collection = Collection::new(name.to_string());
collections.insert(name.to_string(), new_collection.clone());
new_collection
}
/// Wait-Free выполнение команды
pub fn execute_command(&self, command: protocol::Command) -> Result<protocol::Response> {
match command {
protocol::Command::Create { collection, document } => {
let coll = self.get_collection(&collection);
match coll.create_document(document) {
Ok(id) => Ok(protocol::Response::Success(id.into_bytes())),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::Read { collection, id } => {
let coll = self.get_collection(&collection);
match coll.read_document(&id) {
Ok(Some(document)) => Ok(protocol::Response::Success(document)),
Ok(None) => Ok(protocol::Response::Error(format!("Document not found: {}", id))),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::Update { collection, id, document } => {
let coll = self.get_collection(&collection);
match coll.update_document(&id, document) {
Ok(_) => Ok(protocol::Response::Success(vec![])),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::Delete { collection, id } => {
let coll = self.get_collection(&collection);
match coll.delete_document(&id) {
Ok(_) => Ok(protocol::Response::Success(vec![])),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::Query { collection, filter } => {
let coll = self.get_collection(&collection);
match coll.query_documents(filter) {
Ok(documents) => {
let json_docs: Vec<Value> = documents.into_iter()
.filter_map(|doc| serde_json::from_slice(&doc).ok())
.collect();
match serde_json::to_vec(&json_docs) {
Ok(data) => Ok(protocol::Response::Success(data)),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::CreateProcedure { name, code } => {
let mut procedures = self.procedures.write()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
procedures.insert(name, code);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::CallProcedure { name } => {
let procedures = self.procedures.read()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
if procedures.contains_key(&name) {
// TODO: Выполнить Lua код процедура
Ok(protocol::Response::Success(format!("Procedure {} executed", name).into_bytes()))
} else {
Ok(protocol::Response::Error(format!("Procedure not found: {}", name)))
}
}
protocol::Command::BeginTransaction { transaction_id } => {
let mut transactions = self.transactions.write()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
if transactions.contains_key(&transaction_id) {
return Ok(protocol::Response::Error("Transaction already exists".to_string()));
}
transactions.insert(transaction_id, Vec::new());
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::CommitTransaction { transaction_id } => {
let mut transactions = self.transactions.write()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
if let Some(commands) = transactions.remove(&transaction_id) {
// Выполняем все команды транзакции wait-free способом
for cmd in commands {
if let Err(e) = self.execute_command(cmd) {
return Ok(protocol::Response::Error(format!("Transaction failed: {}", e)));
}
}
Ok(protocol::Response::Success(vec![]))
} else {
Ok(protocol::Response::Error("Transaction not found".to_string()))
}
}
protocol::Command::RollbackTransaction { transaction_id } => {
let mut transactions = self.transactions.write()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
if transactions.remove(&transaction_id).is_some() {
Ok(protocol::Response::Success(vec![]))
} else {
Ok(protocol::Response::Error("Transaction not found".to_string()))
}
}
protocol::Command::CreateIndex { collection, index } => {
let coll = self.get_collection(&collection);
match coll.create_index(index) {
Ok(_) => Ok(protocol::Response::Success(vec![])),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::QueryByIndex { collection, index_name, value } => {
let coll = self.get_collection(&collection);
let value: Value = serde_json::from_slice(&value)
.map_err(|e| crate::common::error::FalcotError::DatabaseError(e.to_string()))?;
match coll.query_by_index(&index_name, &value) {
Ok(document_ids) => {
let result = serde_json::to_vec(&document_ids)
.map_err(|e| crate::common::error::FalcotError::DatabaseError(e.to_string()))?;
Ok(protocol::Response::Success(result))
}
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
// Обработка новых команд для шардинга, constraints, компрессии и глобальных индексов
protocol::Command::AddShardNode { node_id, address, capacity } => {
// TODO: Реализовать добавление шард-узла
println!("Adding shard node: {} at {} with capacity {}", node_id, address, capacity);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::RemoveShardNode { node_id } => {
// TODO: Реализовать удаление шард-узла
println!("Removing shard node: {}", node_id);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::MigrateShard { collection, from_node, to_node, shard_key } => {
// TODO: Реализовать миграцию шарда
println!("Migrating shard from {} to {} for collection {} with key {}", from_node, to_node, collection, shard_key);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::RebalanceCluster => {
// TODO: Реализовать ребалансировку кластера
println!("Rebalancing cluster");
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::GetClusterStatus => {
// TODO: Реализовать получение статуса кластера
let status = protocol::ClusterStatus {
nodes: vec![],
total_capacity: 0,
total_used: 0,
rebalance_needed: false,
};
match protocol::serialize(&status) {
Ok(data) => Ok(protocol::Response::Success(data)),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::AddConstraint { collection, constraint_name, constraint_type, field, value } => {
// TODO: Реализовать добавление constraint
println!("Adding constraint {} to collection {}: {} {} with value {:?}", constraint_name, collection, constraint_type, field, value);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::RemoveConstraint { collection, constraint_name } => {
// TODO: Реализовать удаление constraint
println!("Removing constraint {} from collection {}", constraint_name, collection);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::EnableCompression { collection, algorithm } => {
// TODO: Реализовать включение компрессии
println!("Enabling {} compression for collection {}", algorithm, collection);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::DisableCompression { collection } => {
// TODO: Реализовать отключение компрессии
println!("Disabling compression for collection {}", collection);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::CreateGlobalIndex { name, field, unique } => {
// TODO: Реализовать создание глобального индекса
println!("Creating global index {} on field {} (unique: {})", name, field, unique);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::QueryGlobalIndex { index_name, value } => {
// TODO: Реализовать запрос по глобальному индексу
println!("Querying global index {} with value {:?}", index_name, value);
Ok(protocol::Response::Success(vec![]))
}
}
}
/// Wait-Free получение статистики базы данных
#[allow(dead_code)]
pub fn get_stats(&self) -> Result<HashMap<String, usize>> {
let collections = self.collections.read()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let procedures = self.procedures.read()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let transactions = self.transactions.read()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let mut stats = HashMap::new();
stats.insert("collections".to_string(), collections.len());
stats.insert("procedures".to_string(), procedures.len());
stats.insert("active_transactions".to_string(), transactions.len());
// Подсчет документов во всех коллекциях
let total_documents: usize = collections.values()
.map(|coll| coll.count_documents().unwrap_or(0))
.sum();
stats.insert("total_documents".to_string(), total_documents);
Ok(stats)
}
/// Создание бэкапа базы данных
pub fn create_backup(&self) -> Result<HashMap<String, HashMap<String, Vec<u8>>>> {
let collections = self.collections.read()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let mut backup = HashMap::new();
for (name, collection) in collections.iter() {
let documents = collection.documents.read()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let mut collection_backup = HashMap::new();
for (id, document) in documents.iter() {
collection_backup.insert(id.clone(), document.clone());
}
backup.insert(name.clone(), collection_backup);
}
Ok(backup)
}
/// Восстановление из бэкапа
pub fn restore_from_backup(&self, backup: HashMap<String, HashMap<String, Vec<u8>>>) -> Result<()> {
let mut collections = self.collections.write()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
// Очищаем существующие коллекции
collections.clear();
// Восстанавливаем данные из бэкапа
for (collection_name, documents) in backup {
let collection = Collection::new(collection_name.clone());
{
let mut collection_docs = collection.documents.write()
.map_err(|e| crate::common::error::FalcotError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
for (id, document) in documents {
collection_docs.insert(id, document);
}
} // collection_docs выходит из области видимости здесь
collections.insert(collection_name, collection);
}
Ok(())
}
/// Добавление триггера к коллекции
pub fn add_trigger(&self, trigger: Trigger) -> Result<()> {
let collection = self.get_collection(&trigger.collection);
collection.add_trigger(trigger)
}
}

357
src/server/http.rs Normal file
View File

@ -0,0 +1,357 @@
// src/server/http.rs
//! HTTP/HTTPS сервер с wait-free обработкой запросов
#![allow(dead_code)]
#![allow(unused_variables)]
use std::sync::Arc;
use hyper::{Body, Request, Response, Server, StatusCode};
use hyper::service::{make_service_fn, service_fn};
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use crate::common::error::Result;
use crate::server::database::Database;
/// Конфигурация статических файлов
#[derive(Clone)]
pub struct StaticFilesConfig {
pub enabled: bool,
pub directory: String,
}
impl Default for StaticFilesConfig {
fn default() -> Self {
Self {
enabled: true,
directory: "static".to_string(),
}
}
}
/// Конфигурация TLS
#[derive(Clone)]
pub struct TlsConfig {
pub enabled: bool,
pub cert_path: String,
pub key_path: String,
}
impl Default for TlsConfig {
fn default() -> Self {
Self {
enabled: true,
cert_path: "certs/cert.pem".to_string(),
key_path: "certs/key.pem".to_string(),
}
}
}
/// Конфигурация HTTP сервера
#[derive(Clone)]
pub struct HttpConfig {
pub enabled: bool,
pub port: u16,
pub http2_enabled: bool,
}
/// Конфигурация ACL
#[derive(Clone)]
pub struct AclConfig {
pub enabled: bool,
pub allowed_ips: Vec<String>,
pub denied_ips: Vec<String>,
}
impl Default for AclConfig {
fn default() -> Self {
Self {
enabled: false,
allowed_ips: vec!["127.0.0.1".to_string(), "::1".to_string()],
denied_ips: vec![],
}
}
}
/// Wait-free обработчик HTTP запросов с поддержкой ACL
async fn handle_request(
req: Request<Body>,
db: Arc<Database>,
static_config: StaticFilesConfig,
acl_config: AclConfig,
) -> Result<Response<Body>> {
// Проверка ACL, если включена
if acl_config.enabled {
if let Some(remote_addr) = req.extensions().get::<std::net::SocketAddr>() {
let ip = remote_addr.ip().to_string();
// Проверка запрещенных IP
if acl_config.denied_ips.contains(&ip) {
return Ok(Response::builder()
.status(StatusCode::FORBIDDEN)
.body(Body::from("Access denied"))
.unwrap());
}
// Проверка разрешенных IP (если список не пустой)
if !acl_config.allowed_ips.is_empty() && !acl_config.allowed_ips.contains(&ip) {
return Ok(Response::builder()
.status(StatusCode::FORBIDDEN)
.body(Body::from("Access denied"))
.unwrap());
}
}
}
let path = req.uri().path();
// Обработка API запросов
if path.starts_with("/api/") {
handle_api_request(req, db).await
}
// Обслуживание статических файлов
else if static_config.enabled {
handle_static_file(path, static_config).await
}
// Корневой путь
else if path == "/" {
Ok(Response::new(Body::from("Falcot Database Server - HTTP API")))
}
// 404 для остальных запросов
else {
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Not Found"))
.unwrap())
}
}
/// Wait-free обработка API запросов
async fn handle_api_request(
_req: Request<Body>,
_db: Arc<Database>,
) -> Result<Response<Body>> {
// TODO: Реализовать wait-free обработку CRUD операций через HTTP
Ok(Response::new(Body::from(r#"{"status": "ok", "message": "Falcot Server is running"}"#)))
}
/// Wait-free обслуживание статических файлов
async fn handle_static_file(
path: &str,
config: StaticFilesConfig,
) -> Result<Response<Body>> {
let file_path = if path == "/" {
format!("{}/index.html", config.directory)
} else {
format!("{}{}", config.directory, path)
};
match File::open(&file_path).await {
Ok(mut file) => {
let mut contents = Vec::new();
file.read_to_end(&mut contents).await
.map_err(|e: std::io::Error| crate::common::error::FalcotError::HttpError(e.to_string()))?;
let content_type = get_content_type(&file_path);
Ok(Response::builder()
.header("Content-Type", content_type)
.body(Body::from(contents))
.unwrap())
}
Err(_) => {
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("File not found"))
.unwrap())
}
}
}
/// Определение Content-Type по расширению файла
fn get_content_type(file_path: &str) -> &'static str {
if file_path.ends_with(".html") {
"text/html"
} else if file_path.ends_with(".css") {
"text/css"
} else if file_path.ends_with(".js") {
"application/javascript"
} else if file_path.ends_with(".png") {
"image/png"
} else if file_path.ends_with(".jpg") || file_path.ends_with(".jpeg") {
"image/jpeg"
} else if file_path.ends_with(".json") {
"application/json"
} else if file_path.ends_with(".mp3") {
"audio/mpeg"
} else if file_path.ends_with(".mp4") {
"video/mp4"
} else {
"text/plain"
}
}
/// Запуск HTTP сервера с wait-free архитектурой
pub async fn start_http_server(
addr: &str,
db: Arc<Database>,
static_config: StaticFilesConfig,
http_config: HttpConfig,
acl_config: AclConfig,
) -> Result<()> {
let addr_parsed: std::net::SocketAddr = addr.parse()
.map_err(|e: std::net::AddrParseError| crate::common::error::FalcotError::HttpError(e.to_string()))?;
let db_clone = db.clone();
let static_clone = static_config.clone();
let acl_clone = acl_config.clone();
// Создание wait-free сервиса
let make_svc = make_service_fn(move |_conn| {
let db = db_clone.clone();
let static_config = static_clone.clone();
let acl_config = acl_clone.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |req| {
handle_request(req, db.clone(), static_config.clone(), acl_config.clone())
}))
}
});
// Запуск сервера
let builder = Server::bind(&addr_parsed);
let server = if http_config.http2_enabled {
// Включение HTTP/2 для HTTP сервера
builder.http2_only(true).serve(make_svc)
} else {
// Использование HTTP/1.1
builder.serve(make_svc)
};
// Запускаем сервер синхронно
server.await
.map_err(|e: hyper::Error| crate::common::error::FalcotError::HttpError(e.to_string()))?;
Ok(())
}
/// Запуск HTTPS сервера с wait-free архитектурой
// Убираем лишний параметр http_config из сигнатуры функции
pub async fn start_https_server(
addr: &str,
db: Arc<Database>,
static_config: StaticFilesConfig,
tls_config: TlsConfig,
acl_config: AclConfig,
) -> Result<()> {
use tokio::net::TcpListener;
use tokio_rustls::TlsAcceptor;
use rustls::{Certificate, PrivateKey, ServerConfig};
use rustls_pemfile::{certs, pkcs8_private_keys};
use std::fs::File;
use std::io::BufReader;
if !tls_config.enabled {
println!("HTTPS disabled in configuration");
return Ok(());
}
// Загрузка TLS сертификата и ключа
let cert_file = File::open(&tls_config.cert_path)
.map_err(|e| crate::common::error::FalcotError::HttpError(format!("Failed to open certificate: {}", e)))?;
let key_file = File::open(&tls_config.key_path)
.map_err(|e| crate::common::error::FalcotError::HttpError(format!("Failed to open key: {}", e)))?;
// Загрузка сертификатов
let cert_chain: Vec<Certificate> = certs(&mut BufReader::new(cert_file))
.map_err(|e| crate::common::error::FalcotError::HttpError(format!("Failed to parse certificate: {}", e)))?
.into_iter()
.map(Certificate)
.collect();
// Загрузка приватного ключа
let mut keys: Vec<PrivateKey> = pkcs8_private_keys(&mut BufReader::new(key_file))
.map_err(|e| crate::common::error::FalcotError::HttpError(format!("Failed to parse key: {}", e)))?
.into_iter()
.map(PrivateKey)
.collect();
if keys.is_empty() {
return Err(crate::common::error::FalcotError::HttpError("No private keys found".to_string()));
}
// Создание конфигурации сервера TLS
let server_config = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(cert_chain, keys.remove(0))
.map_err(|e| crate::common::error::FalcotError::HttpError(format!("Failed to create server config: {}", e)))?;
let addr_parsed: std::net::SocketAddr = addr.parse()
.map_err(|e: std::net::AddrParseError| crate::common::error::FalcotError::HttpError(e.to_string()))?;
let db_clone = db.clone();
let static_clone = static_config.clone();
let acl_clone = acl_config.clone();
// Создание TLS акцептора
let tls_acceptor = TlsAcceptor::from(Arc::new(server_config));
// Создание TCP listener
let tcp_listener = TcpListener::bind(&addr_parsed).await
.map_err(|e| crate::common::error::FalcotError::HttpError(format!("Failed to bind to {}: {}", addr, e)))?;
println!("HTTPS server starting on {}...", addr);
// Принимаем и обрабатываем соединения
while let Ok((tcp_stream, _)) = tcp_listener.accept().await {
let tls_acceptor = tls_acceptor.clone();
let db = db_clone.clone();
let static_config = static_clone.clone();
let acl_config = acl_clone.clone();
tokio::spawn(async move {
// Принимаем TLS соединение
let tls_stream = match tls_acceptor.accept(tcp_stream).await {
Ok(stream) => stream,
Err(e) => {
eprintln!("TLS handshake failed: {}", e);
return;
}
};
// Создаем сервис для этого соединения
let service = service_fn(move |req| {
handle_request(req, db.clone(), static_config.clone(), acl_config.clone())
});
// Обрабатываем HTTP запросы поверх TLS
if let Err(e) = hyper::server::conn::Http::new()
.serve_connection(tls_stream, service)
.await
{
eprintln!("HTTP server error: {}", e);
}
});
}
Ok(())
}
// Вспомогательная функция для логирования
fn log_to_file(message: &str) {
use std::fs::OpenOptions;
use std::io::Write;
if let Ok(mut file) = OpenOptions::new()
.create(true)
.append(true)
.open("falcot.log")
{
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
let log_message = format!("[{}] {}\n", timestamp, message);
let _ = file.write_all(log_message.as_bytes());
}
}

547
src/server/lua_engine.rs Normal file
View File

@ -0,0 +1,547 @@
// src/server/lua_engine.rs
//! Встроенный интерпретатор Lua (на основе rlua)
use rlua::{Lua, RluaCompat};
use std::sync::Arc;
use std::fs;
use std::path::Path;
use std::collections::HashMap;
use crate::common::error::Result;
use crate::common::protocol;
use crate::server::database::{Trigger, TriggerEvent, Index, IndexType};
/// Движок Lua для выполнения скриптов
#[derive(Clone)]
pub struct LuaEngine {
lua: Arc<Lua>,
}
impl LuaEngine {
pub fn new() -> Result<Self> {
let lua = Lua::new();
// Настройка Lua окружения
lua.load(r#"
function falcot_log(message)
print("LUA: " .. message)
end
function falcot_error(message)
print("LUA ERROR: " .. message)
end
"#).exec()?;
Ok(Self { lua: Arc::new(lua) })
}
/// Выполнение Lua скрипта из файла
#[allow(dead_code)]
pub fn execute_script_file(&self, file_path: &str) -> Result<()> {
let script_content = fs::read_to_string(file_path)
.map_err(|e| crate::common::error::FalcotError::LuaError(format!("Failed to read script file {}: {}", file_path, e)))?;
self.execute_script(&script_content)
}
/// Выполнение Lua скрипта из строки
pub fn execute_script(&self, script: &str) -> Result<()> {
let lua = self.lua.clone();
lua.load(script).exec()?;
Ok(())
}
/// Выполнение всех скриптов из директории
#[allow(dead_code)]
pub fn execute_scripts_from_dir(&self, dir_path: &str, script_names: &[String]) -> Result<()> {
let path = Path::new(dir_path);
if !path.exists() {
println!("Lua scripts directory does not exist: {}", path.display());
return Ok(());
}
if !path.is_dir() {
return Err(crate::common::error::FalcotError::LuaError(
format!("Lua scripts path is not a directory: {}", path.display())
));
}
for script_name in script_names {
let script_path = path.join(script_name);
if script_path.exists() && script_path.is_file() {
println!("Executing Lua script: {}", script_path.display());
match self.execute_script_file(script_path.to_str().unwrap()) {
Ok(_) => println!("✓ Script executed successfully: {}", script_name),
Err(e) => eprintln!("✗ Failed to execute script {}: {}", script_name, e),
}
} else {
println!("Script not found: {}", script_path.display());
}
}
Ok(())
}
/// Регистрация функций базы данных в Lua
pub fn register_db_functions(&self, db: Arc<crate::server::database::Database>) -> Result<()> {
let lua = self.lua.clone();
// Создаем таблицу для функций БД
let falcot_db = lua.create_table()?;
// Функция create
let db_clone = db.clone();
falcot_db.set("create", lua.create_function(move |_, (collection, data): (String, String)| {
let command = protocol::Command::Create {
collection,
document: data.into_bytes(),
};
match db_clone.execute_command(command) {
Ok(_) => Ok(()),
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Функция read
let db_clone = db.clone();
falcot_db.set("read", lua.create_function(move |_, (collection, id): (String, String)| {
let command = protocol::Command::Read {
collection,
id,
};
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(data) => {
Ok(String::from_utf8_lossy(&data).to_string())
}
protocol::Response::Error(e) => {
Err(rlua::Error::RuntimeError(e))
}
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Функция update
let db_clone = db.clone();
falcot_db.set("update", lua.create_function(move |_, (collection, id, data): (String, String, String)| {
let command = protocol::Command::Update {
collection,
id,
document: data.into_bytes(),
};
match db_clone.execute_command(command) {
Ok(_) => Ok(()),
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Функция delete
let db_clone = db.clone();
falcot_db.set("delete", lua.create_function(move |_, (collection, id): (String, String)| {
let command = protocol::Command::Delete {
collection,
id,
};
match db_clone.execute_command(command) {
Ok(_) => Ok(()),
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Функция query
let db_clone = db.clone();
falcot_db.set("query", lua.create_function(move |_, (collection, filter): (String, String)| {
let command = protocol::Command::Query {
collection,
filter: filter.into_bytes(),
};
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(data) => {
Ok(String::from_utf8_lossy(&data).to_string())
}
protocol::Response::Error(e) => {
Err(rlua::Error::RuntimeError(e))
}
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Функция call_procedure
let db_clone = db.clone();
falcot_db.set("call_procedure", lua.create_function(move |_, name: String| {
let command = protocol::Command::CallProcedure { name };
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(data) => {
Ok(String::from_utf8_lossy(&data).to_string())
}
protocol::Response::Error(e) => {
Err(rlua::Error::RuntimeError(e))
}
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Функция для выполнения Lua скриптов (без захвата self)
let scripts_dir = self.lua_scripts_dir().to_string();
falcot_db.set("execute", lua.create_function(move |_, script_name: String| {
let full_path = format!("{}/{}", scripts_dir, script_name);
match fs::read_to_string(&full_path) {
Ok(script_content) => {
// Создаем временный Lua контекст для выполнения скрипта
let lua = Lua::new();
match lua.load(&script_content).exec() {
Ok(_) => Ok(format!("Script {} executed successfully", script_name)),
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
}
Err(e) => Err(rlua::Error::RuntimeError(format!("Failed to read script: {}", e))),
}
})?)?;
// Функция для создания бэкапа
let db_clone = db.clone();
falcot_db.set("backup_start", lua.create_function(move |_, ()| {
match db_clone.create_backup() {
Ok(backup) => {
// Сохраняем бэкап в файл
let backup_dir = "./backups";
if let Err(e) = std::fs::create_dir_all(backup_dir) {
return Err(rlua::Error::RuntimeError(format!("Failed to create backup directory: {}", e)));
}
let timestamp = chrono::Local::now().format("%Y%m%d_%H%M%S");
let backup_path = format!("{}/backup_{}.json", backup_dir, timestamp);
let json = serde_json::to_string(&backup)
.map_err(|e| rlua::Error::RuntimeError(format!("Failed to serialize backup: {}", e)))?;
std::fs::write(&backup_path, json)
.map_err(|e| rlua::Error::RuntimeError(format!("Failed to write backup: {}", e)))?;
Ok(format!("Backup created successfully: {}", backup_path))
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Функция для восстановления из бэкапа
let db_clone = db.clone();
falcot_db.set("backup_restore", lua.create_function(move |_, backup_path: String| {
let content = std::fs::read_to_string(&backup_path)
.map_err(|e| rlua::Error::RuntimeError(format!("Failed to read backup: {}", e)))?;
let backup: HashMap<String, HashMap<String, Vec<u8>>> = serde_json::from_str(&content)
.map_err(|e| rlua::Error::RuntimeError(format!("Failed to parse backup: {}", e)))?;
match db_clone.restore_from_backup(backup) {
Ok(_) => Ok("Backup restored successfully".to_string()),
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Функция для добавления триггера
let db_clone = db.clone();
falcot_db.set("add_trigger", lua.create_function(move |_, (name, event, collection, code): (String, String, String, String)| {
let trigger_event = match event.as_str() {
"before_create" => TriggerEvent::BeforeCreate,
"after_create" => TriggerEvent::AfterCreate,
"before_update" => TriggerEvent::BeforeUpdate,
"after_update" => TriggerEvent::AfterUpdate,
"before_delete" => TriggerEvent::BeforeDelete,
"after_delete" => TriggerEvent::AfterDelete,
_ => return Err(rlua::Error::RuntimeError("Invalid trigger event".to_string())),
};
let trigger = Trigger {
name,
event: trigger_event,
collection,
lua_code: code,
};
match db_clone.add_trigger(trigger) {
Ok(_) => Ok("Trigger added successfully".to_string()),
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Функция для создания индекса
let db_clone = db.clone();
falcot_db.set("create_index", lua.create_function(move |_, (collection, name, field, unique): (String, String, String, bool)| {
let index = Index {
name,
index_type: IndexType::Secondary,
field,
unique,
};
let command = protocol::Command::CreateIndex { collection, index };
match db_clone.execute_command(command) {
Ok(_) => Ok("Index created successfully".to_string()),
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Функция для запроса по индексу
let db_clone = db.clone();
falcot_db.set("query_by_index", lua.create_function(move |_, (collection, index_name, value): (String, String, String)| {
let command = protocol::Command::QueryByIndex {
collection,
index_name,
value: value.into_bytes(),
};
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(data) => Ok(String::from_utf8_lossy(&data).to_string()),
protocol::Response::Error(e) => Err(rlua::Error::RuntimeError(e)),
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Новые функции для Constraints
let db_clone = db.clone();
falcot_db.set("add_constraint", lua.create_function(move |_, (collection, name, constraint_type, field, value): (String, String, String, String, Option<String>)| {
let value_bytes = value.map(|v| v.into_bytes()).unwrap_or_default();
let command = protocol::Command::AddConstraint {
collection,
constraint_name: name,
constraint_type,
field,
value: value_bytes,
};
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(_) => Ok("Constraint added successfully".to_string()),
protocol::Response::Error(e) => Err(rlua::Error::RuntimeError(e)),
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
let db_clone = db.clone();
falcot_db.set("remove_constraint", lua.create_function(move |_, (collection, name): (String, String)| {
let command = protocol::Command::RemoveConstraint {
collection,
constraint_name: name,
};
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(_) => Ok("Constraint removed successfully".to_string()),
protocol::Response::Error(e) => Err(rlua::Error::RuntimeError(e)),
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Новые функции для шардинга
let db_clone = db.clone();
falcot_db.set("add_shard_node", lua.create_function(move |_, (node_id, address, capacity_mb): (String, String, u64)| {
let capacity = capacity_mb * 1024 * 1024;
let command = protocol::Command::AddShardNode {
node_id,
address,
capacity,
};
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(_) => Ok("Shard node added successfully".to_string()),
protocol::Response::Error(e) => Err(rlua::Error::RuntimeError(e)),
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
let db_clone = db.clone();
falcot_db.set("remove_shard_node", lua.create_function(move |_, node_id: String| {
let command = protocol::Command::RemoveShardNode {
node_id,
};
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(_) => Ok("Shard node removed successfully".to_string()),
protocol::Response::Error(e) => Err(rlua::Error::RuntimeError(e)),
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
let db_clone = db.clone();
falcot_db.set("migrate_shard", lua.create_function(move |_, (collection, from_node, to_node, shard_key): (String, String, String, String)| {
let command = protocol::Command::MigrateShard {
collection,
from_node,
to_node,
shard_key,
};
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(_) => Ok("Shard migration started successfully".to_string()),
protocol::Response::Error(e) => Err(rlua::Error::RuntimeError(e)),
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
let db_clone = db.clone();
falcot_db.set("get_cluster_status", lua.create_function(move |_, ()| {
let command = protocol::Command::GetClusterStatus;
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(data) => {
if let Ok(status) = protocol::deserialize::<protocol::ClusterStatus>(&data) {
Ok(format!("Cluster status: {} nodes, {} MB used",
status.nodes.len(), status.total_used / 1024 / 1024))
} else {
Ok("Failed to parse cluster status".to_string())
}
}
protocol::Response::Error(e) => Err(rlua::Error::RuntimeError(e)),
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
let db_clone = db.clone();
falcot_db.set("rebalance_cluster", lua.create_function(move |_, ()| {
let command = protocol::Command::RebalanceCluster;
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(_) => Ok("Cluster rebalance started successfully".to_string()),
protocol::Response::Error(e) => Err(rlua::Error::RuntimeError(e)),
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Новые функции для компрессии
let db_clone = db.clone();
falcot_db.set("enable_compression", lua.create_function(move |_, (collection, algorithm): (String, String)| {
let command = protocol::Command::EnableCompression {
collection,
algorithm,
};
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(_) => Ok("Compression enabled successfully".to_string()),
protocol::Response::Error(e) => Err(rlua::Error::RuntimeError(e)),
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
let db_clone = db.clone();
falcot_db.set("disable_compression", lua.create_function(move |_, collection: String| {
let command = protocol::Command::DisableCompression {
collection,
};
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(_) => Ok("Compression disabled successfully".to_string()),
protocol::Response::Error(e) => Err(rlua::Error::RuntimeError(e)),
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Новые функции для глобальных индексов
let db_clone = db.clone();
falcot_db.set("create_global_index", lua.create_function(move |_, (name, field, unique): (String, String, bool)| {
let command = protocol::Command::CreateGlobalIndex {
name,
field,
unique,
};
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(_) => Ok("Global index created successfully".to_string()),
protocol::Response::Error(e) => Err(rlua::Error::RuntimeError(e)),
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
let db_clone = db.clone();
falcot_db.set("query_global_index", lua.create_function(move |_, (index_name, value): (String, String)| {
let command = protocol::Command::QueryGlobalIndex {
index_name,
value: value.into_bytes(),
};
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(data) => {
if let Ok(ids) = serde_json::from_slice::<Vec<String>>(&data) {
Ok(format!("Found {} documents", ids.len()))
} else {
Ok("Failed to parse results".to_string())
}
}
protocol::Response::Error(e) => Err(rlua::Error::RuntimeError(e)),
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Добавляем таблицу в глобальное пространство имен
lua.globals().set("falcot_db", falcot_db)?;
Ok(())
}
/// Получение директории Lua скриптов
fn lua_scripts_dir(&self) -> &'static str {
"lua_scripts"
}
}

50
src/server/messagepack.rs Normal file
View File

@ -0,0 +1,50 @@
// messagepack.rs
//! Обработка MessagePack протокола с wait-free сериализацией
//!
//! Обеспечивает wait-free кодирование и декодирование сообщений
//! с использованием быстрых алгоритмов сериализации.
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::common::error::Result;
use crate::common::protocol;
/// Wait-free кодирование сообщения в MessagePack
#[allow(dead_code)]
pub async fn encode_message<T: serde::Serialize>(
writer: &mut (impl AsyncWriteExt + Unpin),
message: &T,
) -> Result<()> {
let bytes = protocol::serialize(message)?;
// Wait-free отправка длины сообщения (4 байта)
let length = bytes.len() as u32;
writer.write_all(&length.to_be_bytes()).await
.map_err(|e| crate::common::error::FalcotError::NetworkError(e.to_string()))?;
// Wait-free отправка самого сообщения
writer.write_all(&bytes).await
.map_err(|e| crate::common::error::FalcotError::NetworkError(e.to_string()))?;
Ok(())
}
/// Wait-free декодирование сообщения из MessagePack
#[allow(dead_code)]
pub async fn decode_message<T: for<'a> serde::Deserialize<'a>>(
reader: &mut (impl AsyncReadExt + Unpin),
) -> Result<T> {
// Wait-free чтение длины сообщения
let mut len_bytes = [0u8; 4];
reader.read_exact(&mut len_bytes).await
.map_err(|e| crate::common::error::FalcotError::NetworkError(e.to_string()))?;
let length = u32::from_be_bytes(len_bytes) as usize;
// Wait-free чтение самого сообщения
let mut buffer = vec![0u8; length];
reader.read_exact(&mut buffer).await
.map_err(|e| crate::common::error::FalcotError::NetworkError(e.to_string()))?;
protocol::deserialize(&buffer)
}

232
src/server/mod.rs Normal file
View File

@ -0,0 +1,232 @@
// src/server/mod.rs
//! Сервер Falcot - документо-ориентированная БД с wait-free архитектурой
//!
//! Основной модуль сервера, реализующий wait-free доступ к данным,
//! синхронную master-master репликацию и поддержку HTTP/HTTPS.
#![allow(dead_code)]
use std::sync::Arc;
use std::fs::OpenOptions;
use std::io::Write;
use crate::common::error::Result;
use crate::lua_shell::LuaShell;
// Импортируем подмодули
pub mod database;
pub mod replication;
pub mod lua_engine;
pub mod messagepack;
pub mod config;
pub mod http;
pub mod sharding; // Добавляем модуль шардинга
/// Функция для логирования в файл
fn log_to_file(message: &str) {
if let Ok(mut file) = OpenOptions::new()
.create(true)
.append(true)
.open("falcot.log")
{
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
let log_message = format!("[{}] {}\n", timestamp, message);
let _ = file.write_all(log_message.as_bytes());
}
}
/// Функция для вывода текста с ANSI цветом
#[allow(dead_code)]
fn print_colored(text: &str, ansi_color: &str) {
println!("{}{}\x1b[0m", ansi_color, text);
}
/// Конвертация HEX цвета в ANSI escape code
fn hex_to_ansi(hex_color: &str) -> String {
let hex = hex_color.trim_start_matches('#');
if hex.len() == 6 {
if let (Ok(r), Ok(g), Ok(b)) = (
u8::from_str_radix(&hex[0..2], 16),
u8::from_str_radix(&hex[2..4], 16),
u8::from_str_radix(&hex[4..6], 16),
) {
return format!("\x1b[38;2;{};{};{}m", r, g, b);
}
}
"\x1b[38;2;255;255;255m".to_string()
}
/// Основный сервер Falcot с wait-free архитектурой
pub struct FalcotServer {
config: config::Config,
database: Arc<database::Database>,
lua_engine: lua_engine::LuaEngine,
replication: Arc<replication::ReplicationManager>,
http_enabled: bool,
}
impl FalcotServer {
/// Создание нового сервера с wait-free архитектурой
pub async fn new(config_path: &str) -> Result<Self> {
// Загрузка конфигурации
let config = config::Config::load(config_path)?;
// Инициализация компонентов с wait-free подходами
let database = Arc::new(database::Database::new());
let lua_engine = lua_engine::LuaEngine::new()?;
let replication = Arc::new(replication::ReplicationManager::new(
config.replication.master_nodes.iter()
.map(|addr| addr.to_string())
.collect(),
config.replication.sync_interval,
config.replication.enabled,
));
// Регистрация функций БД в Lua
lua_engine.register_db_functions(database.clone())?;
// Инициализация базы данных
FalcotServer::initialize_database(database.clone())?;
// Проверяем, включен ли HTTP режим
let http_enabled = config.server.http_port.is_some() || config.server.https_port.is_some();
Ok(Self {
config,
database,
lua_engine,
replication,
http_enabled,
})
}
/// Инициализация базы данных с wait-free структурами
fn initialize_database(db: Arc<database::Database>) -> Result<()> {
// Создаем системные коллекции с wait-free доступом
let _system_collection = db.get_collection("_system");
let _users_collection = db.get_collection("_users");
let _logs_collection = db.get_collection("_logs");
let _procedures_collection = db.get_collection("_procedures");
let _triggers_collection = db.get_collection("_triggers");
// Создаем директорию для бэкапов
let backup_dir = "/falcot/backups";
if let Err(e) = std::fs::create_dir_all(backup_dir) {
eprintln!("Warning: Failed to create backup directory: {}", e);
}
let message = "Database initialized with system collections";
println!("{}", message);
log_to_file(message);
Ok(())
}
/// Запуск сервера с wait-free архитектурой
pub async fn run(&self) -> Result<()> {
// Вывод приветственного сообщения с цветом #00bfff
println!();
let color_code = hex_to_ansi("#00bfff");
print_colored("Falcot Database Server", &color_code);
println!();
println!("Version: 1.0.0");
println!("Features: Wait-Free Architecture, Master-Master Replication, Lua Scripting, HTTP/HTTPS Support");
log_to_file("Falcot Database Server started");
log_to_file("Version: 1.0.0");
// Запуск HTTP/HTTPS серверов в отдельных задачах, если настроены
if self.http_enabled {
self.start_http_servers().await?;
}
// Запуск Lua интерпретатора
println!("Starting Lua interpreter...");
log_to_file("Starting Lua interpreter...");
let mut lua_shell = LuaShell::new(
self.lua_engine.clone(),
self.database.clone(),
self.replication.clone()
);
// Запуск интерактивной оболочки
lua_shell.run().await?;
Ok(())
}
/// Запуск HTTP/HTTPS серверов в отдельных задачах
async fn start_http_servers(&self) -> Result<()> {
let static_config = http::StaticFilesConfig::default();
let acl_config = http::AclConfig {
enabled: self.config.acl.enabled,
allowed_ips: self.config.acl.allowed_ips.clone(),
denied_ips: self.config.acl.denied_ips.clone(),
};
// Запуск HTTP сервера, если настроен
if let Some(http_port) = self.config.server.http_port {
let http_addr = format!("{}:{}", self.config.server.host, http_port);
let http_config = http::HttpConfig {
enabled: true,
port: http_port,
http2_enabled: self.config.server.http2_enabled.unwrap_or(false),
};
let db_clone = self.database.clone();
let static_config_clone = static_config.clone();
let acl_config_clone = acl_config.clone();
// Запускаем HTTP сервер в отдельной задаче (не блокирующей основной поток)
tokio::spawn(async move {
match http::start_http_server(&http_addr, db_clone, static_config_clone, http_config, acl_config_clone).await {
Ok(_) => {
let message = format!("HTTP server started on {}", http_addr);
println!("{}", message);
log_to_file(&message);
}
Err(e) => {
let message = format!("Failed to start HTTP server: {}", e);
eprintln!("{}", message);
log_to_file(&message);
}
}
});
}
// Запуск HTTPS сервера, если настроен
if let Some(https_port) = self.config.server.https_port {
let https_addr = format!("{}:{}", self.config.server.host, https_port);
let tls_config = http::TlsConfig {
enabled: self.config.tls.enabled,
cert_path: self.config.tls.cert_path.clone(),
key_path: self.config.tls.key_path.clone(),
};
let db_clone = self.database.clone();
let static_config_clone = static_config.clone();
let acl_config_clone = acl_config.clone();
// Запускаем HTTPS сервер в отдельной задаче
tokio::spawn(async move {
match http::start_https_server(&https_addr, db_clone, static_config_clone, tls_config, acl_config_clone).await {
Ok(_) => {
let message = format!("HTTPS server started on {}", https_addr);
println!("{}", message);
log_to_file(&message);
}
Err(e) => {
let message = format!("Failed to start HTTPS server: {}", e);
eprintln!("{}", message);
log_to_file(&message);
}
}
});
}
Ok(())
}
}

225
src/server/replication.rs Normal file
View File

@ -0,0 +1,225 @@
// src/server/replication.rs
//! Модуль репликации Master-Master с wait-free архитектурой
//!
//! Реализует синхронную репликацию между узлами с использованием
//! атомарных операций и lock-free структур данных для обеспечения
//! wait-free доступа.
#![allow(dead_code)]
#![allow(unused_variables)]
use tokio::sync::mpsc;
use tokio::time::{interval, Duration};
use serde::{Serialize, Deserialize};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::io::AsyncWriteExt;
use crate::common::error::Result;
use crate::common::protocol;
/// Менеджер репликации с wait-free архитектурой
#[derive(Clone)]
pub struct ReplicationManager {
nodes: Arc<Vec<String>>,
tx: Arc<mpsc::Sender<ReplicationEvent>>,
sync_interval: u64,
sequence_number: Arc<AtomicU64>,
enabled: bool,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ReplicationEvent {
Command(protocol::Command),
SyncRequest,
Heartbeat,
}
impl ReplicationManager {
/// Создание нового менеджера репликации с wait-free архитектурой
pub fn new(nodes: Vec<String>, sync_interval: u64, enabled: bool) -> Self {
let nodes_arc = Arc::new(nodes);
let (tx, mut rx) = mpsc::channel(1000);
let sequence_number = Arc::new(AtomicU64::new(0));
let sequence_clone = sequence_number.clone();
let nodes_clone = nodes_arc.clone();
let tx_arc = Arc::new(tx);
let tx_clone = tx_arc.clone();
let enabled_clone = enabled;
// Запуск фоновой задачи репликации с wait-free подходом
tokio::spawn(async move {
let mut interval = interval(Duration::from_millis(sync_interval));
loop {
tokio::select! {
Some(event) = rx.recv() => {
if !enabled_clone {
continue; // Пропускаем обработку если репликация отключена
}
match event {
ReplicationEvent::Command(cmd) => {
// Wait-free репликация команды на другие узлы
let nodes = nodes_clone.clone();
let seq = sequence_clone.fetch_add(1, Ordering::SeqCst);
// Используем wait-free подход с асинхронными задачи
for node in nodes.iter() {
let node_clone = node.clone();
let cmd_clone = cmd.clone();
tokio::spawn(async move {
if let Err(e) = Self::send_command_to_node(&node_clone, &cmd_clone, seq).await {
eprintln!("Failed to replicate command to {}: {}", node_clone, e);
}
});
}
}
ReplicationEvent::SyncRequest => {
// Wait-free синхронизация с другими узлами
println!("Starting sync with {} nodes", nodes_clone.len());
for node in nodes_clone.iter() {
let node_clone = node.clone();
tokio::spawn(async move {
if let Err(e) = Self::sync_with_node(&node_clone).await {
eprintln!("Failed to sync with {}: {}", node_clone, e);
}
});
}
}
ReplicationEvent::Heartbeat => {
// Wait-free отправка heartbeat только если репликация включена
if enabled_clone {
for node in nodes_clone.iter() {
let node_clone = node.clone();
tokio::spawn(async move {
if let Err(e) = Self::send_heartbeat(&node_clone).await {
eprintln!("Heartbeat failed for {}: {}", node_clone, e);
}
});
}
}
}
}
}
_ = interval.tick() => {
// Периодическая отправка heartbeat только если репликация включена
if enabled_clone {
if let Err(e) = tx_clone.send(ReplicationEvent::Heartbeat).await {
eprintln!("Failed to send heartbeat: {}", e);
}
}
}
}
}
});
Self {
nodes: nodes_arc,
tx: tx_arc,
sync_interval,
sequence_number,
enabled,
}
}
/// Wait-free отправка команды на удаленный узел
async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> {
let mut stream = match tokio::net::TcpStream::connect(node).await {
Ok(stream) => stream,
Err(e) => {
// Логируем ошибку соединения, но не прерываем выполнение
eprintln!("Failed to connect to {}: {}", node, e);
return Ok(());
}
};
let message = protocol::ReplicationMessage {
sequence,
command: command.clone(),
timestamp: chrono::Utc::now().timestamp(),
};
// Используем существующую функцию сериализации
let bytes = protocol::serialize(&message)?;
if let Err(e) = stream.write_all(&bytes).await {
eprintln!("Failed to send command to {}: {}", node, e);
}
Ok(())
}
/// Wait-free синхронизация с удаленным узлом
async fn sync_with_node(_node: &str) -> Result<()> {
// TODO: Реализовать wait-free синхронизацию данных
Ok(())
}
/// Wait-free отправка heartbeat на удаленный узел
async fn send_heartbeat(node: &str) -> Result<()> {
let mut stream = match tokio::net::TcpStream::connect(node).await {
Ok(stream) => stream,
Err(e) => {
// Логируем ошибку соединения, но не прерываем выполнение
eprintln!("Failed to connect to {} for heartbeat: {}", node, e);
return Ok(());
}
};
let heartbeat = protocol::ReplicationMessage {
sequence: 0,
command: protocol::Command::CallProcedure { name: "heartbeat".to_string() },
timestamp: chrono::Utc::now().timestamp(),
};
let bytes = protocol::serialize(&heartbeat)?;
if let Err(e) = stream.write_all(&bytes).await {
eprintln!("Failed to send heartbeat to {}: {}", node, e);
}
Ok(())
}
/// Wait-free отправка команды на репликацию
pub async fn replicate(&self, command: protocol::Command) -> Result<()> {
if !self.enabled {
return Ok(()); // Пропускаем если репликация отключена
}
self.tx.send(ReplicationEvent::Command(command)).await
.map_err(|e| crate::common::error::FalcotError::ReplicationError(e.to_string()))
}
/// Wait-free запрос синхронизации с другими узлами
pub async fn request_sync(&self) -> Result<()> {
if !self.enabled {
return Ok(()); // Пропускаем если репликация отключена
}
self.tx.send(ReplicationEvent::SyncRequest).await
.map_err(|e| crate::common::error::FalcotError::ReplicationError(e.to_string()))
}
/// Получение списка узлов репликации (wait-free)
pub fn get_nodes(&self) -> &Vec<String> {
&self.nodes
}
/// Получение интервала синхронизации (wait-free)
pub fn get_sync_interval(&self) -> u64 {
self.sync_interval
}
/// Получение текущего номера последовательности (wait-free)
pub fn get_sequence_number(&self) -> u64 {
self.sequence_number.load(Ordering::SeqCst)
}
/// Проверка, включена ли репликация
pub fn is_enabled(&self) -> bool {
self.enabled
}
}

228
src/server/sharding.rs Normal file
View File

@ -0,0 +1,228 @@
// src/server/sharding.rs
//! Модуль шардинга с консистентным хэшированием
use std::collections::{HashMap, BTreeMap};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::sync::RwLock;
use siphasher::sip::SipHasher13;
use serde::{Serialize, Deserialize};
use crate::common::error::Result;
/// Информация о шард-узле
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShardNode {
pub node_id: String,
pub address: String,
pub capacity: u64,
pub used: u64,
pub collections: Vec<String>,
}
/// Состояние шардинга для коллекции
#[derive(Debug, Clone)]
pub struct CollectionSharding {
pub shard_key: String,
pub virtual_nodes: usize,
pub ring: BTreeMap<u64, String>, // consistent hash ring
}
/// Менеджер шардинга с консистентным хэшированием
#[derive(Clone)]
pub struct ShardingManager {
nodes: Arc<RwLock<HashMap<String, ShardNode>>>,
collections: Arc<RwLock<HashMap<String, CollectionSharding>>>,
virtual_nodes_per_node: usize,
}
impl ShardingManager {
/// Создание нового менеджера шардинга
pub fn new(virtual_nodes_per_node: usize) -> Self {
Self {
nodes: Arc::new(RwLock::new(HashMap::new())),
collections: Arc::new(RwLock::new(HashMap::new())),
virtual_nodes_per_node,
}
}
/// Добавление шард-узла
pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> {
let mut nodes = self.nodes.write()
.map_err(|e| crate::common::error::FalcotError::DatabaseError(e.to_string()))?;
let node = ShardNode {
node_id: node_id.clone(),
address,
capacity,
used: 0,
collections: Vec::new(),
};
nodes.insert(node_id, node);
Ok(())
}
/// Удаление шард-узла
pub fn remove_node(&self, node_id: &str) -> Result<()> {
let mut nodes = self.nodes.write()
.map_err(|e| crate::common::error::FalcotError::DatabaseError(e.to_string()))?;
nodes.remove(node_id);
Ok(())
}
/// Настройка шардинга для коллекции
pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> {
let mut collections = self.collections.write()
.map_err(|e| crate::common::error::FalcotError::DatabaseError(e.to_string()))?;
let sharding = CollectionSharding {
shard_key: shard_key.to_string(),
virtual_nodes: self.virtual_nodes_per_node,
ring: BTreeMap::new(),
};
collections.insert(collection.to_string(), sharding);
self.rebuild_ring(collection)?;
Ok(())
}
/// Перестроение хэш-ринга для коллекции
fn rebuild_ring(&self, collection: &str) -> Result<()> {
let nodes = self.nodes.read()
.map_err(|e| crate::common::error::FalcotError::DatabaseError(e.to_string()))?;
let mut collections = self.collections.write()
.map_err(|e| crate::common::error::FalcotError::DatabaseError(e.to_string()))?;
if let Some(sharding) = collections.get_mut(collection) {
sharding.ring.clear();
for (node_id, node) in nodes.iter() {
for i in 0..sharding.virtual_nodes {
let key = format!("{}-{}", node_id, i);
let hash = self.hash_key(&key);
sharding.ring.insert(hash, node_id.clone());
}
}
}
Ok(())
}
/// Хэширование ключа
fn hash_key(&self, key: &str) -> u64 {
let mut hasher = SipHasher13::new();
key.hash(&mut hasher);
hasher.finish()
}
/// Поиск узла для ключа
pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result<Option<String>> {
let collections = self.collections.read()
.map_err(|e| crate::common::error::FalcotError::DatabaseError(e.to_string()))?;
if let Some(sharding) = collections.get(collection) {
let key_hash = self.hash_key(key_value);
// Поиск в хэш-ринге (консистентное хэширование)
let mut range = sharding.ring.range(key_hash..);
if let Some((_, node_id)) = range.next() {
return Ok(Some(node_id.clone()));
}
// Если не найдено в верхней части ринга, берем первый узел
if let Some((_, node_id)) = sharding.ring.iter().next() {
return Ok(Some(node_id.clone()));
}
}
Ok(None)
}
/// Миграция шарда
pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> {
// В реальной реализации здесь была бы логика миграции данных
// Сейчас просто обновляем метаданные
println!("Migrating shard for collection '{}' from {} to {} with key {}",
collection, from_node, to_node, shard_key);
self.rebuild_ring(collection)?;
Ok(())
}
/// Ребалансировка кластера
pub fn rebalance_cluster(&self) -> Result<()> {
let nodes = self.nodes.read()
.map_err(|e| crate::common::error::FalcotError::DatabaseError(e.to_string()))?;
let collection_names: Vec<String> = {
let collections = self.collections.read()
.map_err(|e| crate::common::error::FalcotError::DatabaseError(e.to_string()))?;
collections.keys().cloned().collect()
};
println!("Rebalancing cluster with {} nodes", nodes.len());
// Перестраиваем все хэш-ринги
for collection_name in collection_names {
let mut collections = self.collections.write()
.map_err(|e| crate::common::error::FalcotError::DatabaseError(e.to_string()))?;
if let Some(sharding) = collections.get_mut(&collection_name) {
sharding.ring.clear();
let nodes = self.nodes.read()
.map_err(|e| crate::common::error::FalcotError::DatabaseError(e.to_string()))?;
for (node_id, _) in nodes.iter() {
for i in 0..sharding.virtual_nodes {
let key = format!("{}-{}", node_id, i);
let hash = self.hash_key(&key);
sharding.ring.insert(hash, node_id.clone());
}
}
}
}
Ok(())
}
/// Получение статуса кластера
pub fn get_cluster_status(&self) -> Result<crate::common::protocol::ClusterStatus> {
let nodes = self.nodes.read()
.map_err(|e| crate::common::error::FalcotError::DatabaseError(e.to_string()))?;
let mut cluster_nodes = Vec::new();
let mut total_capacity = 0;
let mut total_used = 0;
for (node_id, node) in nodes.iter() {
total_capacity += node.capacity;
total_used += node.used;
cluster_nodes.push(crate::common::protocol::ShardInfo {
node_id: node_id.clone(),
address: node.address.clone(),
capacity: node.capacity,
used: node.used,
collections: node.collections.clone(),
});
}
Ok(crate::common::protocol::ClusterStatus {
nodes: cluster_nodes,
total_capacity,
total_used,
rebalance_needed: false, // Упрощенная логика
})
}
/// Получение информации об узле
pub fn get_node(&self, node_id: &str) -> Result<Option<ShardNode>> {
let nodes = self.nodes.read()
.map_err(|e| crate::common::error::FalcotError::DatabaseError(e.to_string()))?;
Ok(nodes.get(node_id).cloned())
}
}

182
tests/integration_tests.rs Normal file
View File

@ -0,0 +1,182 @@
// tests/integration_tests.rs
#![allow(dead_code)]
#![allow(unused_imports)]
use falcot::server::database::{Database, Index, IndexType};
use falcot::common::protocol::Command;
use serde_json::json;
use std::sync::Arc;
use tokio::time::{sleep, Duration};
/// Регрессионный тест - проверяем, что основные функции работают после изменений
#[tokio::test]
async fn regression_test() {
let db = Arc::new(Database::new());
// Тестируем базовые CRUD операции
let collection = db.get_collection("test");
// Create
let doc_data = r#"{"name": "test", "value": 42}"#.as_bytes().to_vec();
let id = collection.create_document(doc_data.clone()).unwrap();
assert!(!id.is_empty());
// Read
let document = collection.read_document(&id).unwrap().unwrap();
assert_eq!(document, doc_data);
// Update
let updated_data = r#"{"name": "test", "value": 43}"#.as_bytes().to_vec();
collection.update_document(&id, updated_data.clone()).unwrap();
// Delete
collection.delete_document(&id).unwrap();
assert!(collection.read_document(&id).unwrap().is_none());
println!("Regression test passed");
}
/// Unit-тест - тестируем конкретный модуль (индексы)
#[tokio::test]
async fn unit_test_indexes() {
let db = Arc::new(Database::new());
let collection = db.get_collection("index_test");
// Создаем индекс
let index = Index {
name: "name_index".to_string(),
index_type: IndexType::Secondary,
field: "name".to_string(),
unique: false,
};
collection.create_index(index).unwrap();
// Добавляем документы
let doc1 = json!({"name": "Alice", "age": 30}).to_string().into_bytes();
let doc2 = json!({"name": "Bob", "age": 25}).to_string().into_bytes();
let id1 = collection.create_document(doc1).unwrap();
let id2 = collection.create_document(doc2).unwrap();
// Ищем по индексу
let alice_ids = collection.query_by_index("name_index", &json!("Alice")).unwrap();
assert_eq!(alice_ids, vec![id1]);
let bob_ids = collection.query_by_index("name_index", &json!("Bob")).unwrap();
assert_eq!(bob_ids, vec![id2]);
println!("Unit test for indexes passed");
}
/// Smoke-тест - проверяем, что система запускается и базовые функции работают
#[tokio::test]
async fn smoke_test() {
let db = Arc::new(Database::new());
// Проверяем, что можем создать коллекцию
let collection = db.get_collection("smoke_test");
assert_eq!(collection.get_name(), "smoke_test");
// Проверяем, что можем выполнить команду
let command = Command::Create {
collection: "smoke_test".to_string(),
document: r#"{"test": "data"}"#.as_bytes().to_vec(),
};
let response = db.execute_command(command).unwrap();
assert!(matches!(response, falcot::common::protocol::Response::Success(_)));
println!("Smoke test passed");
}
/// Нагрузочный тест - проверяем производительность под нагрузкой
#[tokio::test]
async fn load_test() {
let db = Arc::new(Database::new());
let collection = db.get_collection("load_test");
let start_time = std::time::Instant::now();
let mut tasks = Vec::new();
// Создаем 1000 документов параллельно
for i in 0..1000 {
let db_clone = db.clone();
tasks.push(tokio::spawn(async move {
let command = Command::Create {
collection: "load_test".to_string(),
document: format!(r#"{{"id": {}, "data": "test_{}"}}"#, i, i).into_bytes(),
};
db_clone.execute_command(command).unwrap();
}));
}
// Ждем завершения всех задач
for task in tasks {
task.await.unwrap();
}
let duration = start_time.elapsed();
println!("Load test completed in {:?}", duration);
// Проверяем, что все документы созданы
let count = collection.count_documents().unwrap();
assert_eq!(count, 1000);
assert!(duration.as_secs() < 5, "Load test took too long: {:?}", duration);
}
/// Стресс-тест - проверяем устойчивость системы при экстремальных условиях
#[tokio::test]
async fn stress_test() {
let db = Arc::new(Database::new());
let collection = db.get_collection("stress_test");
let mut tasks = Vec::new();
// Создаем конкурирующие операции чтения/записи
for i in 0..500 {
let db_clone = db.clone();
// Задачи записи
if i % 2 == 0 {
tasks.push(tokio::spawn(async move {
for j in 0..10 {
let command = Command::Create {
collection: "stress_test".to_string(),
document: format!(r#"{{"thread": {}, "iteration": {}}}"#, i, j).into_bytes(),
};
let _ = db_clone.execute_command(command);
sleep(Duration::from_millis(10)).await;
}
}));
}
// Задачи чтения
else {
tasks.push(tokio::spawn(async move {
for _ in 0..10 {
let command = Command::Query {
collection: "stress_test".to_string(),
filter: vec![],
};
let _ = db_clone.execute_command(command);
sleep(Duration::from_millis(15)).await;
}
}));
}
}
// Ждем завершения с таймаутом
let result = tokio::time::timeout(Duration::from_secs(30), async {
for task in tasks {
task.await.unwrap();
}
}).await;
assert!(result.is_ok(), "Stress test timed out");
// Проверяем, что система не упала и данные сохранены
let count = collection.count_documents().unwrap();
assert!(count > 0, "No documents were created during stress test");
println!("Stress test passed with {} documents", count);
}