first commit

This commit is contained in:
Григорий Сафронов 2025-11-28 01:03:13 +03:00
commit d5b9ba872c
16 changed files with 6208 additions and 0 deletions

1649
Cargo.lock generated Executable file

File diff suppressed because it is too large Load Diff

38
Cargo.toml Executable file
View File

@ -0,0 +1,38 @@
# Cargo.toml
[package]
name = "futriix"
version = "1.0.0"
edition = "2024"
[dependencies]
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rmp-serde = "1.1"
rmp = "0.8"
toml = "0.8"
rlua = "0.20.1"
crossbeam = "0.8"
dashmap = "5.0"
log = "0.4"
env_logger = "0.10"
anyhow = "1.0"
thiserror = "1.0"
uuid = { version = "1.0", features = ["v4"] }
chrono = { version = "0.4", features = ["serde"] }
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = "0.24"
rustls = "0.21"
rustls-pemfile = "1.0"
tokio-rustls = "0.24"
siphasher = "1.0.1"
csv = "1.3"
futures = "0.3" # Добавлена зависимость futures
[dev-dependencies]
tokio = { version = "1.0", features = ["full", "rt-multi-thread", "time"] }
[[test]]
name = "integration_tests"
path = "tests/integration_tests.rs"
harness = true

82
config.toml Executable file
View File

@ -0,0 +1,82 @@
# config.toml
# Конфигурация Futriix Server с wait-free архитектурой
[server]
host = "127.0.0.1"
http_port = 9090
https_port = 8443
max_connections = 10000
connection_timeout = 30
http2_enabled = true
http = true # Новая директива: включение HTTP сервера
https = false # Новая директива: включение HTTPS сервера
[tls]
enabled = false
cert_path = "/futriix/certs/server.crt" # Изменено с /falcot/certs
key_path = "/futriix/certs/server.key" # Изменено с /falcot/certs
[replication]
enabled = true
master_nodes = [
"node1.futriix:9090", # Изменено с falcot на futriix
"node2.futriix:9090",
"node3.futriix:9090"
]
sync_interval = 1000 # ms
replication_factor = 3
[sharding]
enabled = true
shards = 3
replication_factor = 2
auto_balance = true
[cluster] # Новая секция для кластера
enabled = true
name = "futriix-main-cluster"
[acl]
enabled = false
allowed_ips = ["127.0.0.1", "192.168.1.0/24"]
denied_ips = ["10.0.0.5"]
[logging]
level = "info"
file_path = "/futriix/logs/futriix.log" # Изменено с falcot.log
max_file_size = 10485760 # 10MB
backup_count = 5
[backup]
enabled = true
interval = 3600 # 1 hour
retention = 7 # days
path = "/futriix/backups" # Изменено с /falcot/backups
[csv] # Новая секция для CSV
import_dir = "/futriix/csv/import" # Директория для импорта
export_dir = "/futriix/csv/export" # Директория для экспорта
max_file_size = 104857600 # 100MB
[security]
require_authentication = false
jwt_secret = "your-secret-key-here"
password_hashing_rounds = 12
[performance]
max_memory_mb = 1024
cache_size_mb = 512
worker_threads = 4
io_threads = 2
[monitoring]
enabled = false
prometheus_port = 9090
health_check_interval = 30
[limits]
max_documents_per_collection = 1000000
max_collections = 1000
max_indexes_per_collection = 16
request_timeout_ms = 5000
max_request_size_mb = 10

389
futriix.log Executable file
View File

@ -0,0 +1,389 @@
[2025-11-22 00:09:24] Starting Futriix server
[2025-11-22 00:09:24] Loading configuration from: config.toml
[2025-11-22 00:09:24] Database initialized with system collections
[2025-11-22 00:09:24] Server created successfully
[2025-11-22 00:09:24] Futriix Database Server started
[2025-11-22 00:09:24] Version: 1.0.0
[2025-11-22 00:09:24] Starting Lua interpreter...
[2025-11-22 00:09:24] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-11-22 00:09:24] Futriix server stopped
[2025-11-22 00:11:46] Starting Futriix server
[2025-11-22 00:11:46] Loading configuration from: config.toml
[2025-11-22 00:11:46] Database initialized with system collections
[2025-11-22 00:11:46] Server created successfully
[2025-11-22 00:11:46] Futriix Database Server started
[2025-11-22 00:11:46] Version: 1.0.0
[2025-11-22 00:11:46] Starting Lua interpreter...
[2025-11-22 00:11:46] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-11-22 00:12:24] Futriix server stopped
[2025-11-22 00:13:20] Starting Futriix server
[2025-11-22 00:13:20] Loading configuration from: config.toml
[2025-11-22 00:13:20] Database initialized with system collections
[2025-11-22 00:13:20] Server created successfully
[2025-11-22 00:13:20] Futriix Database Server started
[2025-11-22 00:13:20] Version: 1.0.0
[2025-11-22 00:13:20] Starting Lua interpreter...
[2025-11-22 00:13:20] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-11-22 00:13:40] Futriix server stopped
[2025-11-22 00:19:36] Starting Futriix server
[2025-11-22 00:19:36] Loading configuration from: config.toml
[2025-11-22 00:19:36] Database initialized with system collections
[2025-11-22 00:19:36] Server created successfully
[2025-11-22 00:19:36] Futriix Database Server started
[2025-11-22 00:19:36] Version: 1.0.0
[2025-11-22 00:19:36] Starting Lua interpreter...
[2025-11-22 00:19:36] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-11-22 00:19:58] Futriix server stopped
[2025-11-22 00:25:01] Starting Futriix server
[2025-11-22 00:25:01] Loading configuration from: config.toml
[2025-11-22 00:25:01] Database initialized with system collections
[2025-11-22 00:25:01] Server created successfully
[2025-11-22 00:25:01] Futriix Database Server started
[2025-11-22 00:25:01] Version: 1.0.0
[2025-11-22 00:25:01] Starting Lua interpreter...
[2025-11-22 00:25:01] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-11-22 00:33:05] Starting Futriix server
[2025-11-22 00:33:05] Loading configuration from: config.toml
[2025-11-22 00:33:05] Database initialized with system collections
[2025-11-22 00:33:05] Server created successfully
[2025-11-22 00:33:05] Futriix Database Server started
[2025-11-22 00:33:05] Version: 1.0.0
[2025-11-22 00:33:05] Starting Lua interpreter...
[2025-11-22 00:33:05] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-11-22 01:01:31] Starting Futriix server
[2025-11-22 01:01:31] Loading configuration from: config.toml
[2025-11-22 01:01:31] Database initialized with system collections
[2025-11-22 01:01:31] Server created successfully
[2025-11-22 01:01:31] Futriix Database Server started
[2025-11-22 01:01:31] Version: 1.0.0
[2025-11-22 01:01:31] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-22 01:01:31] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-11-22 01:01:31] Starting Lua interpreter...
[2025-11-24 01:05:09] Starting Futriix server
[2025-11-24 01:05:09] Loading configuration from: config.toml
[2025-11-24 01:05:09] Database initialized with system collections
[2025-11-24 01:05:09] Server created successfully
[2025-11-24 01:05:09] Futriix Database Server started
[2025-11-24 01:05:09] Version: 1.0.0
[2025-11-24 01:05:09] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-24 01:05:09] Starting Lua interpreter...
[2025-11-24 01:05:09] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-11-24 01:05:29] Starting Futriix server
[2025-11-24 01:05:29] Loading configuration from: config.toml
[2025-11-24 01:05:29] Database initialized with system collections
[2025-11-24 01:05:29] Server created successfully
[2025-11-24 01:05:29] Futriix Database Server started
[2025-11-24 01:05:29] Version: 1.0.0
[2025-11-24 01:05:29] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-24 01:05:29] Starting Lua interpreter...
[2025-11-24 01:05:29] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-11-24 01:06:41] Futriix server stopped
[2025-11-24 01:06:43] Starting Futriix server
[2025-11-24 01:06:43] Loading configuration from: config.toml
[2025-11-24 01:06:43] Database initialized with system collections
[2025-11-24 01:06:43] Server created successfully
[2025-11-24 01:06:43] Futriix Database Server started
[2025-11-24 01:06:43] Version: 1.0.0
[2025-11-24 01:06:43] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-24 01:06:43] Starting Lua interpreter...
[2025-11-24 01:06:43] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-11-24 01:27:24] Starting Futriix server
[2025-11-24 01:27:24] Loading configuration from: config.toml
[2025-11-24 01:27:24] Database initialized with system collections
[2025-11-24 01:27:24] Server created successfully
[2025-11-24 01:27:24] Futriix Database Server started
[2025-11-24 01:27:24] Version: 1.0.0
[2025-11-24 01:27:24] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-24 01:27:24] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-11-24 01:28:06] Futriix server stopped
[2025-11-25 01:13:37] Starting Futriix server
[2025-11-25 01:13:37] Loading configuration from: config.toml
[2025-11-25 01:13:37] Database initialized with system collections
[2025-11-25 01:13:37] Server created successfully
[2025-11-25 01:13:37] Futriix Database Server started
[2025-11-25 01:13:37] Version: 1.0.0
[2025-11-25 01:13:37] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-25 01:13:37] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-11-25 01:13:57] Futriix server stopped
[2025-11-25 01:25:00] Starting Futriix server
[2025-11-25 01:25:00] Loading configuration from: config.toml
[2025-11-25 01:25:00] Database initialized with system collections
[2025-11-25 01:25:00] Server created successfully
[2025-11-25 01:25:00] Futriix Database Server started
[2025-11-25 01:25:00] Version: 1.0.0
[2025-11-25 01:25:00] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-25 01:25:00] Failed to start HTTPS server: HTTP error: Failed to open certificate: No such file or directory (os error 2)
[2025-11-25 01:33:11] Futriix server stopped
[2025-11-25 01:35:55] Starting Futriix server
[2025-11-25 01:35:55] Loading configuration from: config.toml
[2025-11-25 01:35:55] Database initialized with system collections
[2025-11-25 01:35:55] Server created successfully
[2025-11-25 01:35:55] Futriix Database Server started
[2025-11-25 01:35:55] Version: 1.0.0
[2025-11-25 01:35:55] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-25 01:35:55] HTTPS server started on 127.0.0.1:8443
[2025-11-25 21:31:08] Starting Futriix server
[2025-11-25 21:31:08] Loading configuration from: config.toml
[2025-11-25 21:31:08] Database initialized with system collections
[2025-11-25 21:31:08] Server created successfully
[2025-11-25 21:31:08] Futriix Database Server started
[2025-11-25 21:31:08] Version: 1.0.0
[2025-11-25 21:31:08] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-25 21:31:08] ACL: disabled
[2025-11-25 21:31:08] Replication: enabled
[2025-11-25 21:31:08] HTTPS server started on 127.0.0.1:8443
[2025-11-25 21:31:52] Futriix server stopped
[2025-11-25 21:40:34] Starting Futriix server
[2025-11-25 21:40:34] Loading configuration from: config.toml
[2025-11-25 21:40:34] Database initialized with system collections
[2025-11-25 21:40:34] Server created successfully
[2025-11-25 21:40:34] Futriix Database Server started
[2025-11-25 21:40:34] Version: 1.0.0
[2025-11-25 21:40:34] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-25 21:40:35] HTTPS server started on 127.0.0.1:8443
[2025-11-25 21:41:16] Futriix server stopped
[2025-11-25 21:51:01] Starting Futriix server
[2025-11-25 21:51:01] Loading configuration from: config.toml
[2025-11-25 21:51:01] Database initialized with system collections
[2025-11-25 21:51:01] Server created successfully
[2025-11-25 21:51:01] Futriix Database Server started
[2025-11-25 21:51:01] Version: 1.0.0
[2025-11-25 21:51:01] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-25 21:51:02] HTTPS server started on 127.0.0.1:8443
[2025-11-25 21:51:27] Futriix server stopped
[2025-11-25 22:15:32] Starting Futriix server
[2025-11-25 22:15:32] Loading configuration from: config.toml
[2025-11-25 22:15:32] Database initialized with system collections
[2025-11-25 22:15:32] Server created successfully
[2025-11-25 22:15:32] Futriix Database Server started
[2025-11-25 22:15:32] Version: 1.0.0
[2025-11-25 22:15:32] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-25 22:15:32] HTTPS server started on 127.0.0.1:8443
[2025-11-25 22:15:43] Futriix server stopped
[2025-11-25 22:24:30] Starting Futriix server
[2025-11-25 22:24:30] Loading configuration from: config.toml
[2025-11-25 22:24:30] Database initialized with system collections
[2025-11-25 22:24:30] Server created successfully
[2025-11-25 22:24:30] Futriix Database Server started
[2025-11-25 22:24:30] Version: 1.0.0
[2025-11-25 22:24:30] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-25 22:24:30] HTTPS server started on 127.0.0.1:8443
[2025-11-25 22:41:25] Starting Futriix server
[2025-11-25 22:41:25] Loading configuration from: config.toml
[2025-11-25 22:41:25] Database initialized with system collections
[2025-11-25 22:41:25] Server created successfully
[2025-11-25 22:41:25] Futriix Database Server started
[2025-11-25 22:41:25] Version: 1.0.0
[2025-11-25 22:41:25] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-25 22:41:25] HTTPS server started on 127.0.0.1:8443
[2025-11-25 22:41:57] Futriix server stopped
[2025-11-25 22:43:55] Starting Futriix server
[2025-11-25 22:43:55] Loading configuration from: config.toml
[2025-11-25 22:43:55] Database initialized with system collections
[2025-11-25 22:43:55] Server created successfully
[2025-11-25 22:43:55] Futriix Database Server started
[2025-11-25 22:43:55] Version: 1.0.0
[2025-11-25 22:43:55] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-25 22:43:55] HTTPS server started on 127.0.0.1:8443
[2025-11-25 22:44:09] Futriix server stopped
[2025-11-25 23:24:51] Starting Futriix server
[2025-11-25 23:24:51] Loading configuration from: config.toml
[2025-11-25 23:24:51] Database initialized with system collections
[2025-11-25 23:24:51] Server created successfully
[2025-11-25 23:24:51] Futriix Database Server started
[2025-11-25 23:24:51] Version: 1.0.0
[2025-11-25 23:24:51] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-25 23:24:51] HTTPS server started on 127.0.0.1:8443
[2025-11-25 23:24:55] Futriix server stopped
[2025-11-26 00:44:05] Starting Futriix server
[2025-11-26 00:44:05] Loading configuration from: config.toml
[2025-11-26 00:44:05] Database initialized with system collections
[2025-11-26 00:44:05] Server created successfully
[2025-11-26 00:44:05] Futriix Database Server started
[2025-11-26 00:44:05] Version: 1.0.0
[2025-11-26 00:44:05] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-26 00:44:05] HTTPS server started on 127.0.0.1:8443
[2025-11-26 00:44:52] Futriix server stopped
[2025-11-26 21:49:55] Starting Futriix server
[2025-11-26 21:49:55] Loading configuration from: config.toml
[2025-11-26 21:49:55] Database initialized with system collections
[2025-11-26 21:49:55] Server created successfully
[2025-11-26 21:49:55] Futriix Database Server started
[2025-11-26 21:49:55] Version: 1.0.0
[2025-11-26 21:49:55] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-26 21:49:55] HTTPS server started on 127.0.0.1:8443
[2025-11-26 21:50:34] Futriix server stopped
[2025-11-26 21:51:56] Starting Futriix server
[2025-11-26 21:51:56] Loading configuration from: config.toml
[2025-11-26 21:51:56] Database initialized with system collections
[2025-11-26 21:51:56] Server created successfully
[2025-11-26 21:51:56] Futriix Database Server started
[2025-11-26 21:51:56] Version: 1.0.0
[2025-11-26 21:51:56] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-26 21:51:57] HTTPS server started on 127.0.0.1:8443
[2025-11-26 21:52:50] Futriix server stopped
[2025-11-26 21:55:44] Starting Futriix server
[2025-11-26 21:55:44] Loading configuration from: config.toml
[2025-11-26 21:55:44] Database initialized with system collections
[2025-11-26 21:55:44] Server created successfully
[2025-11-26 21:55:44] Futriix Database Server started
[2025-11-26 21:55:44] Version: 1.0.0
[2025-11-26 21:55:44] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-26 21:55:44] HTTPS server started on 127.0.0.1:8443
[2025-11-26 21:56:18] Futriix server stopped
[2025-11-26 22:25:42] Starting Futriix server
[2025-11-26 22:25:42] Loading configuration from: config.toml
[2025-11-26 22:25:42] Database initialized with system collections
[2025-11-26 22:25:42] Server created successfully
[2025-11-26 22:25:42] Futriix Database Server started
[2025-11-26 22:25:42] Version: 1.0.0
[2025-11-26 22:25:42] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-26 22:25:42] HTTP server started on 127.0.0.1:8080
[2025-11-26 22:26:21] Futriix server stopped
[2025-11-26 22:27:51] Starting Futriix server
[2025-11-26 22:27:51] Loading configuration from: config.toml
[2025-11-26 22:27:51] Database initialized with system collections
[2025-11-26 22:27:51] Server created successfully
[2025-11-26 22:27:51] Futriix Database Server started
[2025-11-26 22:27:51] Version: 1.0.0
[2025-11-26 22:27:51] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-26 22:27:51] HTTP server started on 127.0.0.1:8080
[2025-11-26 22:28:36] Futriix server stopped
[2025-11-26 22:29:32] Starting Futriix server
[2025-11-26 22:29:32] Loading configuration from: config.toml
[2025-11-26 22:29:32] Database initialized with system collections
[2025-11-26 22:29:32] Server created successfully
[2025-11-26 22:29:32] Futriix Database Server started
[2025-11-26 22:29:32] Version: 1.0.0
[2025-11-26 22:29:32] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-26 22:29:32] HTTP server started on 127.0.0.1:9090
[2025-11-26 22:44:04] Starting Futriix server
[2025-11-26 22:44:04] Loading configuration from: config.toml
[2025-11-26 22:44:04] Database initialized with system collections
[2025-11-26 22:44:04] Server created successfully
[2025-11-26 22:44:04] Futriix Database Server started
[2025-11-26 22:44:04] Version: 1.0.0
[2025-11-26 22:44:04] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-26 22:56:20] Starting Futriix server
[2025-11-26 22:56:20] Loading configuration from: config.toml
[2025-11-26 22:56:20] Database initialized with system collections
[2025-11-26 22:56:20] Server created successfully
[2025-11-26 22:56:20] Futriix Database Server started
[2025-11-26 22:56:20] Version: 1.0.0
[2025-11-26 22:56:20] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-26 22:57:12] Futriix server stopped
[2025-11-26 23:05:43] Starting Futriix server
[2025-11-26 23:05:43] Loading configuration from: config.toml
[2025-11-26 23:05:43] Database initialized with system collections
[2025-11-26 23:05:43] Server created successfully
[2025-11-26 23:05:43] Futriix Database Server started
[2025-11-26 23:05:43] Version: 1.0.0
[2025-11-26 23:05:43] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-26 23:07:11] Futriix server stopped
[2025-11-26 23:07:19] Starting Futriix server
[2025-11-26 23:07:19] Loading configuration from: config.toml
[2025-11-26 23:07:19] Database initialized with system collections
[2025-11-26 23:07:19] Server created successfully
[2025-11-26 23:07:19] Futriix Database Server started
[2025-11-26 23:07:19] Version: 1.0.0
[2025-11-26 23:07:19] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-26 23:07:31] Futriix server stopped
[2025-11-27 20:29:48] Starting Futriix server
[2025-11-27 20:29:48] Loading configuration from: config.toml
[2025-11-27 20:29:48] Database initialized with system collections
[2025-11-27 20:29:48] Server created successfully
[2025-11-27 20:29:48] Futriix Database Server started
[2025-11-27 20:29:48] Version: 1.0.0
[2025-11-27 20:29:48] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-27 21:00:06] Starting Futriix server
[2025-11-27 21:00:06] Loading configuration from: config.toml
[2025-11-27 21:00:06] Database initialized with system collections
[2025-11-27 21:00:06] Server created successfully
[2025-11-27 21:00:06] Futriix Database Server started
[2025-11-27 21:00:06] Version: 1.0.0
[2025-11-27 21:00:06] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-27 21:00:06] HTTP server started on 127.0.0.1:9090
[2025-11-27 21:03:23] Futriix server stopped
[2025-11-27 21:05:05] Starting Futriix server
[2025-11-27 21:05:05] Loading configuration from: config.toml
[2025-11-27 21:05:05] Database initialized with system collections
[2025-11-27 21:05:05] Server created successfully
[2025-11-27 21:05:05] Futriix Database Server started
[2025-11-27 21:05:05] Version: 1.0.0
[2025-11-27 21:05:05] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-27 21:05:05] HTTP server started on 127.0.0.1:9090
[2025-11-27 21:05:16] Starting Futriix server
[2025-11-27 21:05:16] Loading configuration from: config.toml
[2025-11-27 21:05:16] Database initialized with system collections
[2025-11-27 21:05:16] Server created successfully
[2025-11-27 21:05:16] Futriix Database Server started
[2025-11-27 21:05:16] Version: 1.0.0
[2025-11-27 21:05:16] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-27 21:05:16] HTTP server started on 127.0.0.1:9090
[2025-11-27 21:35:10] Starting Futriix server
[2025-11-27 21:35:10] Loading configuration from: config.toml
[2025-11-27 21:35:10] Database initialized with system collections
[2025-11-27 21:35:10] Server created successfully
[2025-11-27 21:35:10] Futriix Database Server started
[2025-11-27 21:35:10] Version: 1.0.0
[2025-11-27 21:35:10] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-27 21:36:33] Starting Futriix server
[2025-11-27 21:36:33] Loading configuration from: config.toml
[2025-11-27 21:36:33] Database initialized with system collections
[2025-11-27 21:36:33] Server created successfully
[2025-11-27 21:36:33] Futriix Database Server started
[2025-11-27 21:36:33] Version: 1.0.0
[2025-11-27 21:36:33] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-27 21:38:49] Starting Futriix server
[2025-11-27 21:38:49] Loading configuration from: config.toml
[2025-11-27 21:38:49] Database initialized with system collections
[2025-11-27 21:38:49] Server created successfully
[2025-11-27 21:38:49] Futriix Database Server started
[2025-11-27 21:38:49] Version: 1.0.0
[2025-11-27 21:38:49] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-27 21:56:48] Starting Futriix server
[2025-11-27 21:56:48] Loading configuration from: config.toml
[2025-11-27 21:56:48] Database initialized with system collections
[2025-11-27 21:56:48] Server created successfully
[2025-11-27 21:56:48] Futriix Database Server started
[2025-11-27 21:56:48] Version: 1.0.0
[2025-11-27 21:56:48] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-27 22:14:09] Starting Futriix server
[2025-11-27 22:14:09] Loading configuration from: config.toml
[2025-11-27 22:14:09] Database initialized with system collections
[2025-11-27 22:14:09] Server created successfully
[2025-11-27 22:14:09] Futriix Database Server started
[2025-11-27 22:14:09] Version: 1.0.0
[2025-11-27 22:14:09] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-28 00:13:27] Starting Futriix server
[2025-11-28 00:13:27] Loading configuration from: config.toml
[2025-11-28 00:13:27] Database initialized with system collections
[2025-11-28 00:13:27] Server created successfully
[2025-11-28 00:13:27] Futriix Database Server started
[2025-11-28 00:13:27] Version: 1.0.0
[2025-11-28 00:13:27] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-28 00:37:01.851] Starting Futriix server
[2025-11-28 00:37:01.851] Loading configuration from: config.toml
[2025-11-28 00:37:01.858] Database initialized with system collections
[2025-11-28 00:37:01.858] Server created successfully
[2025-11-28 00:37:01.858] Futriix Database Server started
[2025-11-28 00:37:01.858] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-28 00:46:20.204] Starting Futriix server
[2025-11-28 00:46:20.204] Loading configuration from: config.toml
[2025-11-28 00:46:20.210] Database initialized with system collections
[2025-11-28 00:46:20.210] Server created successfully
[2025-11-28 00:46:20.210] Futriix Database Server started
[2025-11-28 00:46:20.210] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-28 00:57:07.501] Starting Futriix server
[2025-11-28 00:57:07.501] Loading configuration from: config.toml
[2025-11-28 00:57:07.504] Database initialized with system collections
[2025-11-28 00:57:07.504] Server created successfully
[2025-11-28 00:57:07.504] Futriix Database Server started
[2025-11-28 00:57:07.505] Mode: cluster (cluster: 'futriix-main-cluster')
[2025-11-28 00:57:42.988] Futriix server stopped
[2025-11-28 00:59:01.667] Starting Futriix server
[2025-11-28 00:59:01.667] Loading configuration from: config.toml
[2025-11-28 00:59:01.671] Database initialized with system collections
[2025-11-28 00:59:01.671] Server created successfully
[2025-11-28 00:59:01.671] Futriix Database Server started
[2025-11-28 00:59:01.671] Mode: cluster (cluster: 'futriix-main-cluster')

36
lua_script/init.lua Executable file
View File

@ -0,0 +1,36 @@
-- lua_scripts/init.lua
-- Инициализационный Lua скрипт для Futriix Server
-- Автоматически выполняется при старте сервера для настройки окружения
falcot_log("Initializing Futriix Server v1.0.0 with Lua scripting...")
-- Создаем глобальные функции для бэкапов
function futriix.engine.backup.start()
return futriix_db.backup_start()
end
function futriix.engine.backup.restore(backup_path)
return futriix_db.backup_restore(backup_path)
end
-- Пример создания коллекции при старте с временной меткой
local current_timestamp = os.date("%d-%m-%Y")
futriix_db.create("system_config", '{"key": "server_start_time", "value": "' .. current_timestamp .. '"}')
falcot_log("System configuration initialized with timestamp: " .. current_timestamp)
-- Пример ACL проверки
function check_access(ip_address)
if ip_address == "127.0.0.1" then
return true
end
return false
end
-- Логирование информации о кластере
if futriix.cluster.enabled then
falcot_log("Cluster mode enabled: " .. futriix.cluster.name)
else
falcot_log("Standalone mode enabled")
end
falcot_log("Lua initialization script completed")

154
protocol.rs Executable file
View File

@ -0,0 +1,154 @@
// src/common/protocol.rs
//! Протокол обмена данными для Falcot
//!
//! Определяет структуры команд и ответов для взаимодействия между
//! компонентами системы с использованием wait-free сериализации.
#![allow(dead_code)]
use serde::{Deserialize, Serialize};
use crate::server::database::Index;
/// Команды для выполнения в базе данных
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Command {
Create {
collection: String,
document: Vec<u8>,
},
Read {
collection: String,
id: String,
},
Update {
collection: String,
id: String,
document: Vec<u8>,
},
Delete {
collection: String,
id: String,
},
Query {
collection: String,
filter: Vec<u8>,
},
CreateProcedure {
name: String,
code: Vec<u8>,
},
CallProcedure {
name: String,
},
BeginTransaction {
transaction_id: String,
},
CommitTransaction {
transaction_id: String,
},
RollbackTransaction {
transaction_id: String,
},
CreateIndex {
collection: String,
index: Index,
},
QueryByIndex {
collection: String,
index_name: String,
value: Vec<u8>,
},
// Новые команды для шардинга
AddShardNode {
node_id: String,
address: String,
capacity: u64,
},
RemoveShardNode {
node_id: String,
},
MigrateShard {
collection: String,
from_node: String,
to_node: String,
shard_key: String,
},
RebalanceCluster,
GetClusterStatus,
// Команды для constraints
AddConstraint {
collection: String,
constraint_name: String,
constraint_type: String,
field: String,
value: Vec<u8>,
},
RemoveConstraint {
collection: String,
constraint_name: String,
},
// Команды для компрессии
EnableCompression {
collection: String,
algorithm: String,
},
DisableCompression {
collection: String,
},
// Команды для глобальных индексов
CreateGlobalIndex {
name: String,
field: String,
unique: bool,
},
QueryGlobalIndex {
index_name: String,
value: Vec<u8>,
},
}
/// Ответы от базы данных
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Response {
Success(Vec<u8>),
Error(String),
}
/// Сообщение для репликации
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationMessage {
pub sequence: u64,
pub command: Command,
pub timestamp: i64,
}
/// Структура для информации о шарде
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShardInfo {
pub node_id: String,
pub address: String,
pub capacity: u64,
pub used: u64,
pub collections: Vec<String>,
}
/// Структура для статуса кластера
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterStatus {
pub nodes: Vec<ShardInfo>,
pub total_capacity: u64,
pub total_used: u64,
pub rebalance_needed: bool,
}
/// Wait-Free сериализация сообщений
pub fn serialize<T: serde::Serialize>(value: &T) -> crate::common::error::Result<Vec<u8>> {
rmp_serde::to_vec(value)
.map_err(|e| crate::common::error::FalcotError::SerializationError(e.to_string()))
}
/// Wait-Free десериализация сообщений
pub fn deserialize<'a, T: serde::Deserialize<'a>>(bytes: &'a [u8]) -> crate::common::error::Result<T> {
rmp_serde::from_slice(bytes)
.map_err(|e| crate::common::error::FalcotError::SerializationError(e.to_string()))
}

566
src/common/mod.rs Executable file
View File

@ -0,0 +1,566 @@
// src/common/mod.rs
//! Общие модули для Futriix
//!
//! Содержит общие структуры данных, ошибки, протоколы и конфигурацию,
//! используемые во всех компонентах системы с wait-free архитектурой.
use thiserror::Error;
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::Path;
/// Основной тип ошибки для Futriix
#[derive(Error, Debug)]
pub enum FutriixError {
#[error("Configuration error: {0}")]
ConfigError(String),
#[error("Database error: {0}")]
DatabaseError(String),
#[error("Lua error: {0}")]
LuaError(String),
#[error("Network error: {0}")]
NetworkError(String),
#[error("Replication error: {0}")]
ReplicationError(String),
#[error("HTTP error: {0}")]
HttpError(String),
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
#[error("CSV error: {0}")]
CsvError(String),
#[error("Unknown error: {0}")]
Unknown(String),
}
// Реализация преобразования из rlua::Error в FutriixError
impl From<rlua::Error> for FutriixError {
fn from(error: rlua::Error) -> Self {
FutriixError::LuaError(error.to_string())
}
}
/// Тип результата для Futriix
pub type Result<T> = std::result::Result<T, FutriixError>;
// Модуль конфигурации
pub mod config {
use super::*;
/// Конфигурация сервера
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Config {
#[serde(default = "ServerConfig::default")]
pub server: ServerConfig,
#[serde(default = "ReplicationConfig::default")]
pub replication: ReplicationConfig,
#[serde(default = "ClusterConfig::default")] // Новая секция кластера
pub cluster: ClusterConfig,
#[serde(default = "LuaConfig::default")]
pub lua: LuaConfig,
#[serde(default = "AclConfig::default")]
pub acl: AclConfig,
#[serde(default = "TlsConfig::default")]
pub tls: TlsConfig,
#[serde(default = "CsvConfig::default")]
pub csv: CsvConfig,
}
/// Конфигурация серверных параметров
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ServerConfig {
#[serde(default = "default_host")]
pub host: String,
#[serde(default = "default_port")]
pub port: u16,
#[serde(default)]
pub http_port: Option<u16>,
#[serde(default)]
pub https_port: Option<u16>,
#[serde(default)]
pub http2_enabled: Option<bool>,
#[serde(default = "default_http_enabled")] // Новая директива
pub http: bool,
#[serde(default = "default_https_enabled")] // Новая директива
pub https: bool,
}
/// Конфигурация репликации
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ReplicationConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_master_nodes")]
pub master_nodes: Vec<String>,
#[serde(default = "default_sync_interval")]
pub sync_interval: u64,
}
/// Конфигурация кластера
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ClusterConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_cluster_name")]
pub name: String,
}
/// Конфигурация Lua
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LuaConfig {
#[serde(default = "default_scripts_dir")]
pub scripts_dir: String,
#[serde(default)]
pub auto_execute: Vec<String>,
}
/// Конфигурация ACL
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AclConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_allowed_ips")]
pub allowed_ips: Vec<String>,
#[serde(default)]
pub denied_ips: Vec<String>,
}
/// Конфигурация TLS
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TlsConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_cert_path")]
pub cert_path: String,
#[serde(default = "default_key_path")]
pub key_path: String,
}
/// Конфигурация CSV
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct CsvConfig {
#[serde(default = "default_csv_import_dir")]
pub import_dir: String,
#[serde(default = "default_csv_export_dir")]
pub export_dir: String,
#[serde(default = "default_max_csv_file_size")]
pub max_file_size: u64,
}
// Функции для значений по умолчанию
fn default_host() -> String {
"127.0.0.1".to_string()
}
fn default_port() -> u16 {
8081
}
fn default_http_enabled() -> bool {
true
}
fn default_https_enabled() -> bool {
false
}
fn default_master_nodes() -> Vec<String> {
vec!["127.0.0.1:8081".to_string(), "127.0.0.1:8083".to_string()]
}
fn default_sync_interval() -> u64 {
5000
}
fn default_cluster_name() -> String {
"futriix-default-cluster".to_string()
}
fn default_scripts_dir() -> String {
"lua_scripts".to_string()
}
fn default_allowed_ips() -> Vec<String> {
vec!["127.0.0.1".to_string(), "::1".to_string()]
}
fn default_cert_path() -> String {
"certs/cert.pem".to_string()
}
fn default_key_path() -> String {
"certs/key.pem".to_string()
}
fn default_csv_import_dir() -> String {
"/futriix/csv/import".to_string()
}
fn default_csv_export_dir() -> String {
"/futriix/csv/export".to_string()
}
fn default_max_csv_file_size() -> u64 {
104857600 // 100MB
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
host: default_host(),
port: default_port(),
http_port: Some(8082),
https_port: None,
http2_enabled: Some(false),
http: default_http_enabled(),
https: default_https_enabled(),
}
}
}
impl Default for ReplicationConfig {
fn default() -> Self {
Self {
enabled: false,
master_nodes: default_master_nodes(),
sync_interval: default_sync_interval(),
}
}
}
impl Default for ClusterConfig {
fn default() -> Self {
Self {
enabled: false,
name: default_cluster_name(),
}
}
}
impl Default for LuaConfig {
fn default() -> Self {
Self {
scripts_dir: default_scripts_dir(),
auto_execute: vec!["init.lua".to_string()],
}
}
}
impl Default for AclConfig {
fn default() -> Self {
Self {
enabled: false,
allowed_ips: default_allowed_ips(),
denied_ips: vec![],
}
}
}
impl Default for TlsConfig {
fn default() -> Self {
Self {
enabled: false,
cert_path: default_cert_path(),
key_path: default_key_path(),
}
}
}
impl Default for CsvConfig {
fn default() -> Self {
Self {
import_dir: default_csv_import_dir(),
export_dir: default_csv_export_dir(),
max_file_size: default_max_csv_file_size(),
}
}
}
impl Default for Config {
fn default() -> Self {
Self {
server: ServerConfig::default(),
replication: ReplicationConfig::default(),
cluster: ClusterConfig::default(),
lua: LuaConfig::default(),
acl: AclConfig::default(),
tls: TlsConfig::default(),
csv: CsvConfig::default(),
}
}
}
impl Config {
/// Загрузка конфигурации из файла
pub fn load(path: &str) -> Result<Self> {
let path = Path::new(path);
if !path.exists() {
// Создание конфигурации по умолчанию
let default_config = Config::default();
let toml_content = toml::to_string_pretty(&default_config)
.map_err(|e| FutriixError::ConfigError(e.to_string()))?;
fs::write(path, toml_content)
.map_err(|e| FutriixError::ConfigError(e.to_string()))?;
println!("Created default configuration file: {}", path.display());
return Ok(default_config);
}
let content = fs::read_to_string(path)
.map_err(|e| FutriixError::ConfigError(e.to_string()))?;
// Парсим конфигурацию с использованием значений по умолчанию
let mut config: Config = toml::from_str(&content)
.map_err(|e| FutriixError::ConfigError(e.to_string()))?;
// Убеждаемся, что все поля имеют значения по умолчанию, если они отсутствуют
if config.server.host.is_empty() {
config.server.host = default_host();
}
if config.server.port == 0 {
config.server.port = default_port();
}
if config.replication.master_nodes.is_empty() {
config.replication.master_nodes = default_master_nodes();
}
if config.replication.sync_interval == 0 {
config.replication.sync_interval = default_sync_interval();
}
if config.cluster.name.is_empty() {
config.cluster.name = default_cluster_name();
}
if config.lua.scripts_dir.is_empty() {
config.lua.scripts_dir = default_scripts_dir();
}
if config.acl.allowed_ips.is_empty() {
config.acl.allowed_ips = default_allowed_ips();
}
if config.tls.cert_path.is_empty() {
config.tls.cert_path = default_cert_path();
}
if config.tls.key_path.is_empty() {
config.tls.key_path = default_key_path();
}
if config.csv.import_dir.is_empty() {
config.csv.import_dir = default_csv_import_dir();
}
if config.csv.export_dir.is_empty() {
config.csv.export_dir = default_csv_export_dir();
}
if config.csv.max_file_size == 0 {
config.csv.max_file_size = default_max_csv_file_size();
}
Ok(config)
}
/// Сохранение конфигурации в файл
#[allow(dead_code)]
pub fn save(&self, path: &str) -> Result<()> {
let toml_content = toml::to_string_pretty(self)
.map_err(|e| FutriixError::ConfigError(e.to_string()))?;
fs::write(path, toml_content)
.map_err(|e| FutriixError::ConfigError(e.to_string()))
}
}
}
// Модуль протокола
pub mod protocol {
use serde::{Deserialize, Serialize};
/// Команды для выполнения в базе данных
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Command {
// Базовые CRUD команды
Create {
collection: String,
document: Vec<u8>,
},
Read {
collection: String,
id: String,
},
Update {
collection: String,
id: String,
document: Vec<u8>,
},
Delete {
collection: String,
id: String,
},
Query {
collection: String,
filter: Vec<u8>,
},
// Команды для процедур и транзакций
CreateProcedure {
name: String,
code: Vec<u8>,
},
CallProcedure {
name: String,
},
BeginTransaction {
transaction_id: String,
},
CommitTransaction {
transaction_id: String,
},
RollbackTransaction {
transaction_id: String,
},
// Команды для индексов
CreateIndex {
collection: String,
index: crate::server::database::Index,
},
QueryByIndex {
collection: String,
index_name: String,
value: Vec<u8>,
},
// Команды для шардинга с Raft
AddShardNode {
node_id: String,
address: String,
capacity: u64,
},
RemoveShardNode {
node_id: String,
},
MigrateShard {
collection: String,
from_node: String,
to_node: String,
shard_key: String,
},
RebalanceCluster,
GetClusterStatus,
StartElection, // Новая команда для Raft выборов
GetRaftNodes, // Новая команда для получения Raft узлов
// Команды для constraints
AddConstraint {
collection: String,
constraint_name: String,
constraint_type: String,
field: String,
value: Vec<u8>,
},
RemoveConstraint {
collection: String,
constraint_name: String,
},
// Команды для компрессии
EnableCompression {
collection: String,
algorithm: String,
},
DisableCompression {
collection: String,
},
// Команды для глобальных индексов
CreateGlobalIndex {
name: String,
field: String,
unique: bool,
},
QueryGlobalIndex {
index_name: String,
value: Vec<u8>,
},
// Новые команды для CSV импорта/экспорта
ImportCsv {
collection: String,
file_path: String,
},
ExportCsv {
collection: String,
file_path: String,
},
ListCsvFiles,
GetImportProgress {
collection: String,
},
}
/// Ответы от базы данных
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Response {
Success(Vec<u8>),
Error(String),
}
/// Сообщение для репликации
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationMessage {
pub sequence: u64,
pub command: Command,
pub timestamp: i64,
}
/// Структура для информации о шарде
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShardInfo {
pub node_id: String,
pub address: String,
pub capacity: u64,
pub used: u64,
pub collections: Vec<String>,
}
/// Структура для информации о Raft узле
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RaftNodeInfo {
pub node_id: String,
pub address: String,
pub state: String, // "leader", "follower", "candidate"
pub term: u64,
pub last_heartbeat: i64,
}
/// Структура для статуса кластера
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterStatus {
pub nodes: Vec<ShardInfo>,
pub total_capacity: u64,
pub total_used: u64,
pub rebalance_needed: bool,
pub cluster_formed: bool, // Новое поле: собран ли кластер
pub leader_exists: bool, // Новое поле: существует ли лидер
pub raft_nodes: Vec<RaftNodeInfo>, // Новое поле: список Raft узлов
}
/// Wait-Free сериализация сообщений
pub fn serialize<T: serde::Serialize>(value: &T) -> crate::common::Result<Vec<u8>> {
rmp_serde::to_vec(value)
.map_err(|e| crate::common::FutriixError::SerializationError(e.to_string()))
}
/// Wait-Free десериализация сообщений
pub fn deserialize<'a, T: serde::Deserialize<'a>>(bytes: &'a [u8]) -> crate::common::Result<T> {
rmp_serde::from_slice(bytes)
.map_err(|e| crate::common::FutriixError::SerializationError(e.to_string()))
}
}

595
src/lua_shell.rs Executable file
View File

@ -0,0 +1,595 @@
// src/lua_shell.rs
//! Интерактивная Lua оболочка для Futriix
//!
//! Предоставляет интерфейс для взаимодействия с базой данных через Lua
//! и CRUD команды. Использует wait-free доступ к данным через атомарные ссылки.
#![allow(dead_code)]
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, BufReader};
use serde_json::Value;
use crate::common::Result;
use crate::server::database::Database;
use crate::server::lua_engine::LuaEngine;
use crate::server::sharding::ShardingManager;
use crate::server::csv_import_export::CsvManager;
use crate::common::protocol;
use crate::server::database::{Index, IndexType};
/// Конвертация HEX цвета в ANSI escape code
fn hex_to_ansi(hex_color: &str) -> String {
let hex = hex_color.trim_start_matches('#');
if hex.len() == 6 {
if let (Ok(r), Ok(g), Ok(b)) = (
u8::from_str_radix(&hex[0..2], 16),
u8::from_str_radix(&hex[2..4], 16),
u8::from_str_radix(&hex[4..6], 16),
) {
return format!("\x1b[38;2;{};{};{}m", r, g, b);
}
}
"\x1b[38;2;255;255;255m".to_string()
}
/// Вывод текста с красным цветом для ошибок
fn print_error(text: &str) {
let red_color = hex_to_ansi("#FF0000");
println!("{}{}\x1b[0m", red_color, text);
}
/// Вывод текста с зеленым цветом для успеха
fn print_success(text: &str) {
let green_color = hex_to_ansi("#00FF00");
println!("{}{}\x1b[0m", green_color, text);
}
/// Вывод текста с синим цветом для информации
fn print_info(text: &str) {
let blue_color = hex_to_ansi("#00bfff");
println!("{}{}\x1b[0m", blue_color, text);
}
/// Вывод текста с фисташковым цветом для приглашения Lua
fn print_pistachio(text: &str) {
let pistachio_color = hex_to_ansi("#93C572"); // Фисташковый цвет
println!("{}{}\x1b[0m", pistachio_color, text);
}
/// Интерактивная Lua оболочка
pub struct LuaShell {
lua_engine: LuaEngine,
database: Arc<Database>,
sharding_manager: Arc<ShardingManager>,
csv_manager: Arc<CsvManager>,
inbox_mode: bool,
}
impl LuaShell {
pub fn new(
lua_engine: LuaEngine,
database: Arc<Database>,
sharding_manager: Arc<ShardingManager>,
csv_manager: Arc<CsvManager>,
) -> Self {
Self {
lua_engine,
database,
sharding_manager,
csv_manager,
inbox_mode: false,
}
}
/// Запуск интерактивной оболочки
pub async fn run(&mut self) -> Result<()> {
let stdin = tokio::io::stdin();
let mut reader = BufReader::new(stdin).lines();
// Выводим приветственное сообщение при запуске Lua интерпретатора
print_pistachio("Lua interpreter started. Type 'inbox.start' for database commands or Lua code.");
println!();
loop {
if self.inbox_mode {
let inbox_prompt_color = hex_to_ansi("#00bfff");
print!("{}futriix:~>\x1b[0m ", inbox_prompt_color);
} else {
// ПРИГЛАШЕНИЕ LUA ТЕПЕРЬ ФИСТАШКОВОГО ЦВЕТА
let pistachio_color = hex_to_ansi("#93C572");
print!("{}lua>\x1b[0m ", pistachio_color);
}
let _ = std::io::Write::flush(&mut std::io::stdout());
let line = match reader.next_line().await {
Ok(Some(line)) => line,
Ok(None) => break,
Err(e) => {
eprintln!("Read error: {}", e);
continue;
}
};
let input = line.trim();
match input {
"exit" | "quit" => break,
"inbox.start" => {
self.inbox_mode = true;
print_success("Entering database mode. Type CRUD commands or 'inbox.stop' to exit.");
continue;
}
"inbox.stop" if self.inbox_mode => {
self.inbox_mode = false;
print_success("Exiting database mode. Back to Lua interpreter.");
continue;
}
"help" if self.inbox_mode => {
self.show_help().await?;
continue;
}
_ => {}
}
if self.inbox_mode {
self.handle_inbox_command(input).await?;
} else {
self.handle_lua_command(input).await?;
}
}
print_info("Shutting down Futriix server...");
Ok(())
}
/// Обработка Lua команд
async fn handle_lua_command(&self, input: &str) -> Result<()> {
if input.is_empty() {
return Ok(());
}
match self.lua_engine.execute_script(input) {
Ok(_) => {}
Err(e) => {
let error_msg = e.to_string();
if error_msg.contains("Lua error: syntax error:") || error_msg.contains("Unknown command:") {
print_error(&error_msg);
} else {
eprintln!("Lua error: {}", e);
}
}
}
Ok(())
}
/// Обработка команд inbox (CRUD + новые команды)
async fn handle_inbox_command(&self, input: &str) -> Result<()> {
let parts: Vec<&str> = input.split_whitespace().collect();
if parts.is_empty() {
return Ok(());
}
match parts[0] {
// Базовые CRUD команды
"create" => self.handle_create(parts).await,
"read" => self.handle_read(parts).await,
"update" => self.handle_update(parts).await,
"delete" => self.handle_delete(parts).await,
"list" => self.handle_list(parts).await,
// Новые команды для управления кластером
"cluster.status" => self.handle_cluster_status(parts).await,
"add.node" => self.handle_add_node(parts).await,
"evict.node" => self.handle_evict_node(parts).await,
"list.raft.nodes" => self.handle_list_raft_nodes(parts).await,
"cluster.rebalance" => self.handle_cluster_rebalance(parts).await,
// Новые команды для CSV операций
"csv" => self.handle_csv(parts).await,
"help" => self.show_help().await,
_ => {
let error_msg = format!("Unknown command: {}. Type 'help' for available commands.", parts[0]);
print_error(&error_msg);
Ok(())
}
}
}
// Новые методы для управления кластером
async fn handle_cluster_status(&self, _parts: Vec<&str>) -> Result<()> {
match self.sharding_manager.get_cluster_status() {
Ok(status) => {
println!("Cluster Status:");
println!(" Formed: {}", status.cluster_formed);
println!(" Leader Exists: {}", status.leader_exists);
println!(" Total Capacity: {}", status.total_capacity);
println!(" Total Used: {}", status.total_used);
println!(" Nodes: {}", status.nodes.len());
for node in status.nodes {
println!(" - {}: {} ({}% used)", node.node_id, node.address, (node.used as f64 / node.capacity as f64) * 100.0);
}
println!(" Raft Nodes: {}", status.raft_nodes.len());
for raft_node in status.raft_nodes {
println!(" - {}: {} (term: {}, state: {})", raft_node.node_id, raft_node.address, raft_node.term, raft_node.state);
}
}
Err(e) => {
println!("Error getting cluster status: {}", e);
}
}
Ok(())
}
async fn handle_add_node(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: add.node <node_url> or add.node <node_ip>");
return Ok(());
}
let node_address = parts[1].to_string();
let node_id = format!("node_{}", uuid::Uuid::new_v4().to_string()[..8].to_string());
match self.sharding_manager.add_node(node_id.clone(), node_address.clone(), 1024 * 1024 * 1024) {
Ok(_) => {
println!("Node '{}' added to cluster at address '{}'", node_id, node_address);
}
Err(e) => {
println!("Error adding node: {}", e);
}
}
Ok(())
}
async fn handle_evict_node(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: evict.node <node_url> or evict.node <node_ip>");
return Ok(());
}
let node_address = parts[1].to_string();
// Находим node_id по адресу
let mut node_id_to_remove = None;
for entry in self.sharding_manager.get_nodes() {
if entry.address == node_address {
node_id_to_remove = Some(entry.node_id.clone());
break;
}
}
if let Some(node_id) = node_id_to_remove {
match self.sharding_manager.remove_node(&node_id) {
Ok(_) => {
println!("Node '{}' at address '{}' removed from cluster", node_id, node_address);
}
Err(e) => {
println!("Error removing node: {}", e);
}
}
} else {
println!("Node with address '{}' not found in cluster", node_address);
}
Ok(())
}
async fn handle_list_raft_nodes(&self, _parts: Vec<&str>) -> Result<()> {
let raft_nodes = self.sharding_manager.get_raft_nodes();
println!("Raft Nodes ({}):", raft_nodes.len());
for node in raft_nodes {
println!(" - {}: {} (term: {}, state: {:?}, last_heartbeat: {})",
node.node_id, node.address, node.term, node.state, node.last_heartbeat);
}
Ok(())
}
async fn handle_cluster_rebalance(&self, _parts: Vec<&str>) -> Result<()> {
match self.sharding_manager.rebalance_cluster() {
Ok(_) => {
println!("Cluster rebalancing completed successfully");
}
Err(e) => {
println!("Error rebalancing cluster: {}", e);
}
}
Ok(())
}
// Новый метод для CSV операций
async fn handle_csv(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: csv import <collection> <file_path>");
println!(" csv export <collection> <file_path>");
println!(" csv list");
println!(" csv progress <collection>");
return Ok(());
}
match parts[1] {
"import" => {
if parts.len() < 4 {
println!("Usage: csv import <collection> <file_path>");
return Ok(());
}
let collection = parts[2].to_string();
let file_path = parts[3].to_string();
match self.csv_manager.import_csv(&collection, &file_path) {
Ok(count) => {
println!("Successfully imported {} records from '{}'", count, file_path);
}
Err(e) => {
println!("Error importing CSV: {}", e);
}
}
}
"export" => {
if parts.len() < 4 {
println!("Usage: csv export <collection> <file_path>");
return Ok(());
}
let collection = parts[2].to_string();
let file_path = parts[3].to_string();
match self.csv_manager.export_csv(&collection, &file_path) {
Ok(count) => {
println!("Successfully exported {} records to '{}'", count, file_path);
}
Err(e) => {
println!("Error exporting CSV: {}", e);
}
}
}
"list" => {
match self.csv_manager.list_csv_files() {
Ok(files) => {
if files.is_empty() {
println!("No CSV files found");
} else {
println!("CSV files:");
for file in files {
println!(" - {}", file);
}
}
}
Err(e) => {
println!("Error listing CSV files: {}", e);
}
}
}
"progress" => {
if parts.len() < 3 {
println!("Usage: csv progress <collection>");
return Ok(());
}
let collection = parts[2].to_string();
let progress = self.csv_manager.get_import_progress(&collection);
println!("Import progress for '{}': {:.2}%", collection, progress);
}
_ => {
println!("Usage: csv import <collection> <file_path>");
println!(" csv export <collection> <file_path>");
println!(" csv list");
println!(" csv progress <collection>");
}
}
Ok(())
}
// Базовые методы CRUD (упрощенные)
async fn handle_create(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 3 {
println!("Usage: create <collection> <json_data>");
return Ok(());
}
let collection = parts[1].to_string();
let document = parts[2..].join(" ").into_bytes();
let command = protocol::Command::Create {
collection,
document,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(data) = response {
if let Ok(id) = String::from_utf8(data) {
println!("Document created with ID: {}", id);
} else {
println!("Document created successfully");
}
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
Ok(())
}
async fn handle_read(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 3 {
println!("Usage: read <collection> <id>");
return Ok(());
}
let collection = parts[1].to_string();
let id = parts[2].to_string();
let command = protocol::Command::Read {
collection,
id,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(data) = response {
if let Ok(document) = String::from_utf8(data) {
println!("{}", document);
} else {
println!("Document read successfully (binary data)");
}
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
Ok(())
}
async fn handle_update(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 4 {
println!("Usage: update <collection> <id> <json_data>");
return Ok(());
}
let collection = parts[1].to_string();
let id = parts[2].to_string();
let document = parts[3..].join(" ").into_bytes();
let command = protocol::Command::Update {
collection,
id,
document,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
println!("Document updated successfully");
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
Ok(())
}
async fn handle_delete(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 3 {
println!("Usage: delete <collection> <id>");
return Ok(());
}
let collection = parts[1].to_string();
let id = parts[2].to_string();
let command = protocol::Command::Delete {
collection,
id,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
println!("Document deleted successfully");
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
Ok(())
}
async fn handle_list(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: list <collection> [filter]");
return Ok(());
}
let collection = parts[1].to_string();
let filter = if parts.len() > 2 {
parts[2..].join(" ").into_bytes()
} else {
vec![]
};
let command = protocol::Command::Query {
collection,
filter,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(data) = response {
if let Ok(documents) = String::from_utf8(data) {
// Используем std::result::Result вместо нашего Result
let parsed: std::result::Result<Value, _> = serde_json::from_str(&documents);
match parsed {
Ok(value) => {
println!("{}", serde_json::to_string_pretty(&value).unwrap());
}
Err(_) => {
println!("{}", documents);
}
}
} else {
println!("Documents read successfully (binary data)");
}
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
Ok(())
}
/// Показать справку по командам
async fn show_help(&self) -> Result<()> {
println!("Available commands:");
println!(" Basic CRUD:");
println!(" create <collection> <json_data> - Create document");
println!(" read <collection> <id> - Read document");
println!(" update <collection> <id> <json> - Update document");
println!(" delete <collection> <id> - Delete document");
println!(" list <collection> [filter] - List documents");
println!(" Cluster Management:");
println!(" cluster.status - Show cluster status");
println!(" add.node <node_url> - Add node to cluster");
println!(" evict.node <node_url> - Remove node from cluster");
println!(" list.raft.nodes - List Raft nodes");
println!(" cluster.rebalance - Rebalance cluster (shards and nodes)");
println!(" CSV Operations:");
println!(" csv import <coll> <file> - Import CSV to collection");
println!(" csv export <coll> <file> - Export collection to CSV");
println!(" csv list - List CSV files");
println!(" csv progress <coll> - Show import progress");
println!(" Other:");
println!(" inbox.stop - Exit database mode");
println!(" help - Show this help");
Ok(())
}
}

158
src/main.rs Executable file
View File

@ -0,0 +1,158 @@
// src/main.rs
//! Главный модуль сервера Futriix
//!
//! Точка входа в приложение, инициализирует сервер и запускает его.
//! Использует wait-free архитектуру с lock-free структурами данных.
mod common;
mod server;
mod lua_shell;
use std::env;
use std::fs::OpenOptions;
use std::io::Write;
use crate::common::FutriixError;
/// Функция для логирования в файл
fn log_to_file(message: &str) {
match OpenOptions::new()
.create(true)
.append(true)
.open("futriix.log")
{
Ok(mut file) => {
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string();
let log_message = format!("[{}] {}\n", timestamp, message);
let _ = file.write_all(log_message.as_bytes());
}
Err(e) => eprintln!("Failed to write to log file: {}", e),
}
}
/// Простая структура для аргументов командной строки
struct Args {
config: String,
debug: bool,
http_port: Option<u16>,
https_port: Option<u16>,
host: Option<String>,
}
/// Простой парсер аргументов командной строки
fn parse_args() -> Args {
let mut args = Args {
config: "config.toml".to_string(),
debug: false,
http_port: None,
https_port: None,
host: None,
};
let mut iter = env::args().skip(1);
while let Some(arg) = iter.next() {
match arg.as_str() {
"--config" | "-c" => {
if let Some(value) = iter.next() {
args.config = value;
}
}
"--debug" | "-d" => {
args.debug = true;
}
"--http-port" => {
if let Some(value) = iter.next() {
if let Ok(port) = value.parse() {
args.http_port = Some(port);
}
}
}
"--https-port" => {
if let Some(value) = iter.next() {
if let Ok(port) = value.parse() {
args.https_port = Some(port);
}
}
}
"--host" => {
if let Some(value) = iter.next() {
args.host = Some(value);
}
}
_ => {
if arg.starts_with("--config=") {
args.config = arg.trim_start_matches("--config=").to_string();
} else if arg.starts_with("-c=") {
args.config = arg.trim_start_matches("-c=").to_string();
}
}
}
}
args
}
/// Функция для вывода текста с ANSI цветом
fn print_colored(text: &str, ansi_color: &str) {
println!("{}{}\x1b[0m", ansi_color, text);
}
/// Конвертация HEX цвета в ANSI escape code
fn hex_to_ansi(hex_color: &str) -> String {
let hex = hex_color.trim_start_matches('#');
if hex.len() == 6 {
if let (Ok(r), Ok(g), Ok(b)) = (
u8::from_str_radix(&hex[0..2], 16),
u8::from_str_radix(&hex[2..4], 16),
u8::from_str_radix(&hex[4..6], 16),
) {
return format!("\x1b[38;2;{};{};{}m", r, g, b);
}
}
"\x1b[38;2;255;255;255m".to_string()
}
#[tokio::main]
async fn main() -> Result<(), FutriixError> {
// Инициализация логирования в файл
log_to_file("Starting Futriix server");
// Вывод приветственного сообщения с цветом #00bfff перед загрузкой конфигурации
let color_code = hex_to_ansi("#00bfff");
println!(); // Добавляем пустую строку перед фразой
print_colored("Futriix Database Server", &color_code);
print_colored("futriix 3i²(by 26.11.2025)", &color_code);
println!(); // Добавляем пустую строку после фразы
// Парсим аргументы командной строки
let args = parse_args();
let config_path = args.config;
let message = format!("Loading configuration from: {}", config_path);
println!("{}", message);
log_to_file(&message);
// Создание и запуск сервера
match server::FutriixServer::new(&config_path).await {
Ok(server) => {
log_to_file("Server created successfully");
if let Err(e) = server.run().await {
let error_message = format!("Server error: {}", e);
eprintln!("{}", error_message);
log_to_file(&error_message);
std::process::exit(1);
}
}
Err(e) => {
let error_message = format!("Failed to create server: {}", e);
eprintln!("{}", error_message);
log_to_file(&error_message);
std::process::exit(1);
}
}
log_to_file("Futriix server stopped");
Ok(())
}

219
src/server/csv_import_export.rs Executable file
View File

@ -0,0 +1,219 @@
// src/server/csv_import_export.rs
//! Модуль для импорта/экспорта данных в формате CSV
//!
//! Обеспечивает lock-free операции импорта CSV в базу данных
//! и экспорта коллекций в CSV формат.
use std::sync::Arc;
use std::fs::File;
use std::io::{BufReader, BufWriter};
use csv::{Reader, Writer};
use serde_json::Value;
use dashmap::DashMap;
use crate::common::Result;
use crate::common::config::CsvConfig;
use crate::server::database::Database;
/// Менеджер CSV операций
#[derive(Clone)]
pub struct CsvManager {
database: Arc<Database>,
config: CsvConfig,
import_progress: Arc<DashMap<String, f64>>, // Lock-free отслеживание прогресса
}
impl CsvManager {
/// Создание нового менеджера CSV
pub fn new(database: Arc<Database>, config: CsvConfig) -> Self {
Self {
database,
config,
import_progress: Arc::new(DashMap::new()),
}
}
/// Импорт CSV файла в коллекцию
pub fn import_csv(&self, collection_name: &str, file_path: &str) -> Result<usize> {
println!("Importing CSV file '{}' into collection '{}'", file_path, collection_name);
let file = File::open(file_path)
.map_err(|e| crate::common::FutriixError::IoError(e))?;
let mut reader = Reader::from_reader(BufReader::new(file));
// Получаем заголовки заранее, чтобы избежать множественных заимствований
let headers: Vec<String> = reader.headers()
.map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?
.iter()
.map(|s| s.to_string())
.collect();
let mut record_count = 0;
// Устанавливаем начальный прогресс
self.import_progress.insert(collection_name.to_string(), 0.0);
for result in reader.records() {
let record = result
.map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?;
// Преобразуем CSV запись в JSON документ
let mut document = serde_json::Map::new();
for (i, field) in record.iter().enumerate() {
let header = if i < headers.len() {
&headers[i]
} else {
&format!("field_{}", i)
};
// Пытаемся определить тип данных
let value = if let Ok(num) = field.parse::<f64>() {
Value::Number(serde_json::Number::from_f64(num).unwrap_or(serde_json::Number::from(0)))
} else if field.eq_ignore_ascii_case("true") {
Value::Bool(true)
} else if field.eq_ignore_ascii_case("false") {
Value::Bool(false)
} else {
Value::String(field.to_string())
};
document.insert(header.to_string(), value);
}
// Сохраняем документ в базу данных
let json_value = Value::Object(document);
let json_string = serde_json::to_string(&json_value)
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
let command = crate::common::protocol::Command::Create {
collection: collection_name.to_string(),
document: json_string.into_bytes(),
};
self.database.execute_command(command)?;
record_count += 1;
// Обновляем прогресс каждые 100 записей
if record_count % 100 == 0 {
let progress = (record_count as f64) / 1000.0; // Примерный расчет
self.import_progress.insert(collection_name.to_string(), progress.min(100.0));
println!("Imported {} records...", record_count);
}
}
// Завершаем импорт
self.import_progress.insert(collection_name.to_string(), 100.0);
println!("Successfully imported {} records into collection '{}'", record_count, collection_name);
Ok(record_count)
}
/// Экспорт коллекции в CSV файл
pub fn export_csv(&self, collection_name: &str, file_path: &str) -> Result<usize> {
println!("Exporting collection '{}' to CSV file '{}'", collection_name, file_path);
let file = File::create(file_path)
.map_err(|e| crate::common::FutriixError::IoError(e))?;
let mut writer = Writer::from_writer(BufWriter::new(file));
// Получаем все документы из коллекции
let command = crate::common::protocol::Command::Query {
collection: collection_name.to_string(),
filter: vec![], // Без фильтра
};
let response = self.database.execute_command(command)?;
let documents = match response {
crate::common::protocol::Response::Success(data) => {
serde_json::from_slice::<Vec<Value>>(&data)
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?
}
crate::common::protocol::Response::Error(e) => {
return Err(crate::common::FutriixError::DatabaseError(e));
}
};
if documents.is_empty() {
println!("Collection '{}' is empty", collection_name);
return Ok(0);
}
// Определяем заголовки из первого документа
let first_doc = &documents[0];
let headers: Vec<String> = if let Value::Object(obj) = first_doc {
obj.keys().map(|k| k.to_string()).collect()
} else {
vec!["data".to_string()]
};
// Записываем заголовки
writer.write_record(&headers)
.map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?;
let mut record_count = 0;
// Записываем данные
for document in documents {
if let Value::Object(obj) = document {
let mut record = Vec::new();
// Сохраняем порядок полей согласно заголовкам
for header in &headers {
let value = obj.get(header).unwrap_or(&Value::Null);
let value_str = match value {
Value::String(s) => s.clone(),
Value::Number(n) => n.to_string(),
Value::Bool(b) => b.to_string(),
Value::Null => "".to_string(),
_ => value.to_string(),
};
record.push(value_str);
}
writer.write_record(&record)
.map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?;
record_count += 1;
}
}
writer.flush()
.map_err(|e| crate::common::FutriixError::CsvError(e.to_string()))?;
println!("Successfully exported {} records to '{}'", record_count, file_path);
Ok(record_count)
}
/// Получение прогресса импорта
pub fn get_import_progress(&self, collection_name: &str) -> f64 {
self.import_progress.get(collection_name)
.map(|entry| *entry.value())
.unwrap_or(0.0)
}
/// Список CSV файлов в директории
pub fn list_csv_files(&self) -> Result<Vec<String>> {
let csv_dir = &self.config.import_dir;
let mut csv_files = Vec::new();
if let Ok(entries) = std::fs::read_dir(csv_dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_file() {
if let Some(extension) = path.extension() {
if extension == "csv" {
if let Some(file_name) = path.file_name() {
csv_files.push(file_name.to_string_lossy().to_string());
}
}
}
}
}
}
Ok(csv_files)
}
}

718
src/server/database.rs Executable file
View File

@ -0,0 +1,718 @@
// src/server/database.rs
//! Wait-Free документо-ориентированная база данных Futriix
//!
//! Реализует wait-free доступ к данным с использованием атомарных
//! ссылок и lock-free структур данных для максимальной производительности.
//! Автоматически добавляет временные метки ко всем операциям с документами.
#![allow(unused_imports)]
#![allow(dead_code)]
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
use serde::{Serialize, Deserialize};
use serde_json::Value;
use uuid::Uuid;
use dashmap::DashMap;
use crate::common::Result;
use crate::common::protocol;
/// Триггеры для коллекций
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum TriggerEvent {
BeforeCreate,
AfterCreate,
BeforeUpdate,
AfterUpdate,
BeforeDelete,
AfterDelete,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Trigger {
pub name: String,
pub event: TriggerEvent,
pub collection: String,
pub lua_code: String,
}
/// Типы индексов
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum IndexType {
Primary,
Secondary,
}
/// Структура индекса
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Index {
pub name: String,
pub index_type: IndexType,
pub field: String,
pub unique: bool,
}
/// Wait-Free коллекция документов
#[derive(Clone)]
pub struct Collection {
name: String,
documents: Arc<RwLock<std::collections::HashMap<String, Vec<u8>>>>,
sequence: Arc<AtomicU64>,
triggers: Arc<RwLock<Vec<Trigger>>>,
indexes: Arc<RwLock<std::collections::HashMap<String, Index>>>,
index_data: Arc<DashMap<String, std::collections::HashMap<String, Vec<String>>>>,
}
impl Collection {
/// Создание новой wait-free коллекции
pub fn new(name: String) -> Self {
Self {
name,
documents: Arc::new(RwLock::new(std::collections::HashMap::new())),
sequence: Arc::new(AtomicU64::new(0)),
triggers: Arc::new(RwLock::new(Vec::new())),
indexes: Arc::new(RwLock::new(std::collections::HashMap::new())),
index_data: Arc::new(DashMap::new()),
}
}
/// Функция для логирования операций с временной меткой
fn log_operation(&self, operation: &str, id: &str) {
use std::fs::OpenOptions;
use std::io::Write;
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string();
let log_message = format!("[{}] Collection: '{}', Operation: '{}', Document ID: '{}'\n",
timestamp, self.name, operation, id);
// Логируем в файл
if let Ok(mut file) = OpenOptions::new()
.create(true)
.append(true)
.open("futriix.log")
{
let _ = file.write_all(log_message.as_bytes());
}
// Также выводим в консоль для отладки
println!("{}", log_message.trim());
}
/// Добавление временной метки к документу
fn add_timestamp_to_document(&self, document: Vec<u8>, operation: &str) -> Result<Vec<u8>> {
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string();
// Парсим документ как JSON
let mut doc_value: Value = serde_json::from_slice(&document)
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
// Добавляем временные метки
if let Value::Object(ref mut obj) = doc_value {
obj.insert("_timestamp".to_string(), Value::String(timestamp.clone()));
obj.insert("_operation".to_string(), Value::String(operation.to_string()));
}
// Сериализуем обратно в байты
serde_json::to_vec(&doc_value)
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))
}
/// Добавление триггера
pub fn add_trigger(&self, trigger: Trigger) -> Result<()> {
let mut triggers = self.triggers.write()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
triggers.push(trigger);
Ok(())
}
/// Получение триггеров для события
#[allow(dead_code)]
pub fn get_triggers_for_event(&self, event: TriggerEvent) -> Result<Vec<Trigger>> {
let triggers = self.triggers.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
Ok(triggers.iter()
.filter(|t| t.event == event)
.cloned()
.collect())
}
/// Создание индекса
pub fn create_index(&self, index: Index) -> Result<()> {
let mut indexes = self.indexes.write()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
if indexes.contains_key(&index.name) {
return Err(crate::common::FutriixError::DatabaseError(
format!("Index already exists: {}", index.name)
));
}
// Создаем структуру для хранения данных индекса
self.index_data.insert(index.name.clone(), std::collections::HashMap::new());
let index_clone = index.clone();
indexes.insert(index.name.clone(), index);
// Перестраиваем индекс для существующих документов
self.rebuild_index(&index_clone.name)?;
Ok(())
}
/// Перестроение индекса
fn rebuild_index(&self, index_name: &str) -> Result<()> {
let indexes = self.indexes.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let index = indexes.get(index_name)
.ok_or_else(|| crate::common::FutriixError::DatabaseError(
format!("Index not found: {}", index_name)
))?;
let documents = self.documents.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let mut index_map = std::collections::HashMap::new();
for (id, document_bytes) in documents.iter() {
if let Ok(document) = serde_json::from_slice::<Value>(document_bytes) {
if let Some(field_value) = document.get(&index.field) {
// Конвертируем значение в строку для использования в HashMap
let value_str = field_value.to_string();
let entry = index_map.entry(value_str).or_insert_with(Vec::new);
entry.push(id.clone());
// Проверка уникальности для уникальных индексов
if index.unique && entry.len() > 1 {
return Err(crate::common::FutriixError::DatabaseError(
format!("Duplicate value {} for unique index {}", field_value, index_name)
));
}
}
}
}
self.index_data.insert(index_name.to_string(), index_map);
Ok(())
}
/// Обновление индекса при изменении документа
fn update_indexes(&self, old_document: Option<&[u8]>, new_document: &[u8], document_id: &str) -> Result<()> {
let indexes = self.indexes.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let new_doc_value: Value = serde_json::from_slice(new_document)
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
let old_doc_value: Option<Value> = old_document
.and_then(|doc| serde_json::from_slice(doc).ok());
for (index_name, index) in indexes.iter() {
if let Some(mut index_map) = self.index_data.get_mut(index_name) {
// Удаляем старые значения из индекса
if let Some(old_doc) = &old_doc_value {
if let Some(old_value) = old_doc.get(&index.field) {
let old_value_str = old_value.to_string();
if let Some(entries) = index_map.get_mut(&old_value_str) {
entries.retain(|id| id != document_id);
if entries.is_empty() {
index_map.remove(&old_value_str);
}
}
}
}
// Добавляем новые значения в индекс
if let Some(new_value) = new_doc_value.get(&index.field) {
let new_value_str = new_value.to_string();
let entries = index_map.entry(new_value_str).or_insert_with(Vec::new);
// Проверка уникальности
if index.unique && !entries.is_empty() && entries[0] != document_id {
return Err(crate::common::FutriixError::DatabaseError(
format!("Duplicate value {} for unique index {}", new_value, index_name)
));
}
if !entries.contains(&document_id.to_string()) {
entries.push(document_id.to_string());
}
}
}
}
Ok(())
}
/// Поиск по индексу
pub fn query_by_index(&self, index_name: &str, value: &Value) -> Result<Vec<String>> {
let index_map = self.index_data.get(index_name)
.ok_or_else(|| crate::common::FutriixError::DatabaseError(
format!("Index not found: {}", index_name)
))?;
let value_str = value.to_string();
Ok(index_map.get(&value_str).cloned().unwrap_or_default())
}
/// Wait-Free создание документа с временной меткой
pub fn create_document(&self, document: Vec<u8>) -> Result<String> {
let id = Uuid::new_v4().to_string();
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
// Добавляем временную метку к документу
let document_with_timestamp = self.add_timestamp_to_document(document, "create")?;
let mut documents = self.documents.write()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
// Проверяем уникальность перед вставкой
self.update_indexes(None, &document_with_timestamp, &id)?;
documents.insert(id.clone(), document_with_timestamp);
// Логируем операцию
self.log_operation("create", &id);
println!("Document created in collection '{}' with ID: {} (seq: {})", self.name, id, seq);
Ok(id)
}
/// Wait-Free чтение документа
pub fn read_document(&self, id: &str) -> Result<Option<Vec<u8>>> {
let documents = self.documents.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
// Логируем операцию чтения
self.log_operation("read", id);
Ok(documents.get(id).cloned())
}
/// Wait-Free обновление документа с временной меткой
pub fn update_document(&self, id: &str, document: Vec<u8>) -> Result<()> {
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
// Добавляем временную метку к документу
let document_with_timestamp = self.add_timestamp_to_document(document, "update")?;
let mut documents = self.documents.write()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
if let Some(old_document) = documents.get(id) {
// Обновляем индексы
self.update_indexes(Some(old_document), &document_with_timestamp, id)?;
documents.insert(id.to_string(), document_with_timestamp);
// Логируем операцию
self.log_operation("update", id);
println!("Document updated in collection '{}': {} (seq: {})", self.name, id, seq);
Ok(())
} else {
Err(crate::common::FutriixError::DatabaseError(
format!("Document not found: {}", id)
))
}
}
/// Wait-Free удаление документа с временной меткой
pub fn delete_document(&self, id: &str) -> Result<()> {
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
let mut documents = self.documents.write()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
if let Some(old_document) = documents.get(id) {
// Удаляем из индексов
self.update_indexes(Some(old_document), &[], id)?;
documents.remove(id);
// Логируем операцию
self.log_operation("delete", id);
println!("Document deleted from collection '{}': {} (seq: {})", self.name, id, seq);
Ok(())
} else {
Err(crate::common::FutriixError::DatabaseError(
format!("Document not found: {}", id)
))
}
}
/// Wait-Free запрос документов
pub fn query_documents(&self, _filter: Vec<u8>) -> Result<Vec<Vec<u8>>> {
let documents = self.documents.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
// Логируем операцию запроса
self.log_operation("query", "multiple");
// TODO: Реализовать wait-free фильтрацию на основе filter
let documents: Vec<Vec<u8>> = documents.values().cloned().collect();
Ok(documents)
}
/// Получение имени коллекции (wait-free)
#[allow(dead_code)]
pub fn get_name(&self) -> &str {
&self.name
}
/// Получение количества документов (wait-free)
#[allow(dead_code)]
pub fn count_documents(&self) -> Result<usize> {
let documents = self.documents.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
Ok(documents.len())
}
}
/// Wait-Free база данных
#[derive(Clone)]
pub struct Database {
collections: Arc<DashMap<String, Collection>>,
procedures: Arc<DashMap<String, Vec<u8>>>,
transactions: Arc<DashMap<String, Vec<protocol::Command>>>,
}
impl Database {
/// Создание новой wait-free базы данных
pub fn new() -> Self {
Self {
collections: Arc::new(DashMap::new()),
procedures: Arc::new(DashMap::new()),
transactions: Arc::new(DashMap::new()),
}
}
/// Wait-Free получение или создание коллекции
pub fn get_collection(&self, name: &str) -> Collection {
if let Some(collection) = self.collections.get(name) {
return collection.clone();
}
// Создаем новую коллекцию wait-free способом
let new_collection = Collection::new(name.to_string());
self.collections.insert(name.to_string(), new_collection.clone());
new_collection
}
/// Wait-Free выполнение команды
pub fn execute_command(&self, command: protocol::Command) -> Result<protocol::Response> {
match command {
protocol::Command::Create { collection, document } => {
let coll = self.get_collection(&collection);
match coll.create_document(document) {
Ok(id) => Ok(protocol::Response::Success(id.into_bytes())),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::Read { collection, id } => {
let coll = self.get_collection(&collection);
match coll.read_document(&id) {
Ok(Some(document)) => Ok(protocol::Response::Success(document)),
Ok(None) => Ok(protocol::Response::Error(format!("Document not found: {}", id))),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::Update { collection, id, document } => {
let coll = self.get_collection(&collection);
match coll.update_document(&id, document) {
Ok(_) => Ok(protocol::Response::Success(vec![])),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::Delete { collection, id } => {
let coll = self.get_collection(&collection);
match coll.delete_document(&id) {
Ok(_) => Ok(protocol::Response::Success(vec![])),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::Query { collection, filter } => {
let coll = self.get_collection(&collection);
match coll.query_documents(filter) {
Ok(documents) => {
let json_docs: Vec<Value> = documents.into_iter()
.filter_map(|doc| serde_json::from_slice(&doc).ok())
.collect();
match serde_json::to_vec(&json_docs) {
Ok(data) => Ok(protocol::Response::Success(data)),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::CreateProcedure { name, code } => {
self.procedures.insert(name, code);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::CallProcedure { name } => {
if self.procedures.contains_key(&name) {
// TODO: Выполнить Lua код процедура
Ok(protocol::Response::Success(format!("Procedure {} executed", name).into_bytes()))
} else {
Ok(protocol::Response::Error(format!("Procedure not found: {}", name)))
}
}
protocol::Command::BeginTransaction { transaction_id } => {
if self.transactions.contains_key(&transaction_id) {
return Ok(protocol::Response::Error("Transaction already exists".to_string()));
}
self.transactions.insert(transaction_id, Vec::new());
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::CommitTransaction { transaction_id } => {
if let Some((_, commands)) = self.transactions.remove(&transaction_id) {
// Выполняем все команды транзакции wait-free способом
for cmd in commands {
if let Err(e) = self.execute_command(cmd) {
return Ok(protocol::Response::Error(format!("Transaction failed: {}", e)));
}
}
Ok(protocol::Response::Success(vec![]))
} else {
Ok(protocol::Response::Error("Transaction not found".to_string()))
}
}
protocol::Command::RollbackTransaction { transaction_id } => {
if self.transactions.remove(&transaction_id).is_some() {
Ok(protocol::Response::Success(vec![]))
} else {
Ok(protocol::Response::Error("Transaction not found".to_string()))
}
}
protocol::Command::CreateIndex { collection, index } => {
let coll = self.get_collection(&collection);
match coll.create_index(index) {
Ok(_) => Ok(protocol::Response::Success(vec![])),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::QueryByIndex { collection, index_name, value } => {
let coll = self.get_collection(&collection);
let value: Value = serde_json::from_slice(&value)
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
match coll.query_by_index(&index_name, &value) {
Ok(document_ids) => {
let result = serde_json::to_vec(&document_ids)
.map_err(|e| crate::common::FutriixError::DatabaseError(e.to_string()))?;
Ok(protocol::Response::Success(result))
}
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
// Обработка новых команд для шардинга, constraints, компрессии и глобальных индексов
protocol::Command::AddShardNode { node_id, address, capacity } => {
// TODO: Реализовать добавление шард-узла
println!("Adding shard node: {} at {} with capacity {}", node_id, address, capacity);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::RemoveShardNode { node_id } => {
// TODO: Реализовать удаление шард-узла
println!("Removing shard node: {}", node_id);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::MigrateShard { collection, from_node, to_node, shard_key } => {
// TODO: Реализовать миграцию шарда
println!("Migrating shard from {} to {} for collection {} with key {}", from_node, to_node, collection, shard_key);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::RebalanceCluster => {
// TODO: Реализовать ребалансировку кластера
println!("Rebalancing cluster");
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::GetClusterStatus => {
// TODO: Реализовать получение статуса кластера
let status = protocol::ClusterStatus {
nodes: vec![],
total_capacity: 0,
total_used: 0,
rebalance_needed: false,
cluster_formed: false,
leader_exists: false,
raft_nodes: vec![],
};
match protocol::serialize(&status) {
Ok(data) => Ok(protocol::Response::Success(data)),
Err(e) => Ok(protocol::Response::Error(e.to_string())),
}
}
protocol::Command::StartElection => {
// TODO: Реализовать Raft выборы
println!("Starting Raft election");
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::GetRaftNodes => {
// TODO: Реализовать получение Raft узлов
println!("Getting Raft nodes");
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::AddConstraint { collection, constraint_name, constraint_type, field, value } => {
// TODO: Реализовать добавление constraint
println!("Adding constraint {} to collection {}: {} {} with value {:?}", constraint_name, collection, constraint_type, field, value);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::RemoveConstraint { collection, constraint_name } => {
// TODO: Реализовать удаление constraint
println!("Removing constraint {} from collection {}", constraint_name, collection);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::EnableCompression { collection, algorithm } => {
// TODO: Реализовать включение компрессии
println!("Enabling {} compression for collection {}", algorithm, collection);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::DisableCompression { collection } => {
// TODO: Реализовать отключение компрессии
println!("Disabling compression for collection {}", collection);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::CreateGlobalIndex { name, field, unique } => {
// TODO: Реализовать создание глобального индекса
println!("Creating global index {} on field {} (unique: {})", name, field, unique);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::QueryGlobalIndex { index_name, value } => {
// TODO: Реализовать запрос по глобальному индексу
println!("Querying global index {} with value {:?}", index_name, value);
Ok(protocol::Response::Success(vec![]))
}
// Новые команды для CSV
protocol::Command::ImportCsv { collection, file_path } => {
// TODO: Интегрировать с CsvManager
println!("Importing CSV to collection '{}' from '{}'", collection, file_path);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::ExportCsv { collection, file_path } => {
// TODO: Интегрировать с CsvManager
println!("Exporting collection '{}' to CSV file '{}'", collection, file_path);
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::ListCsvFiles => {
// TODO: Интегрировать с CsvManager
println!("Listing CSV files");
Ok(protocol::Response::Success(vec![]))
}
protocol::Command::GetImportProgress { collection } => {
// TODO: Интегрировать с CsvManager
println!("Getting import progress for '{}'", collection);
Ok(protocol::Response::Success(vec![]))
}
}
}
/// Wait-Free получение статистики базы данных
#[allow(dead_code)]
pub fn get_stats(&self) -> Result<std::collections::HashMap<String, usize>> {
let mut stats = std::collections::HashMap::new();
stats.insert("collections".to_string(), self.collections.len());
stats.insert("procedures".to_string(), self.procedures.len());
stats.insert("active_transactions".to_string(), self.transactions.len());
// Подсчет документов во всех коллекциях
let total_documents: usize = self.collections.iter()
.map(|entry| entry.value().count_documents().unwrap_or(0))
.sum();
stats.insert("total_documents".to_string(), total_documents);
Ok(stats)
}
/// Создание бэкапа базы данных
pub fn create_backup(&self) -> Result<std::collections::HashMap<String, std::collections::HashMap<String, Vec<u8>>>> {
let mut backup = std::collections::HashMap::new();
for entry in self.collections.iter() {
let name = entry.key().clone();
let collection = entry.value();
let documents = collection.documents.read()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
let mut collection_backup = std::collections::HashMap::new();
for (id, document) in documents.iter() {
collection_backup.insert(id.clone(), document.clone());
}
backup.insert(name, collection_backup);
}
Ok(backup)
}
/// Восстановление из бэкапа
pub fn restore_from_backup(&self, backup: std::collections::HashMap<String, std::collections::HashMap<String, Vec<u8>>>) -> Result<()> {
// Очищаем существующие коллекции
self.collections.clear();
// Восстанавливаем данные из бэкапа
for (collection_name, documents) in backup {
let collection = Collection::new(collection_name.clone());
{
let mut collection_docs = collection.documents.write()
.map_err(|e| crate::common::FutriixError::IoError(
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
))?;
for (id, document) in documents {
collection_docs.insert(id, document);
}
}
self.collections.insert(collection_name, collection);
}
Ok(())
}
/// Добавление триггера к коллекции
pub fn add_trigger(&self, trigger: Trigger) -> Result<()> {
let collection = self.get_collection(&trigger.collection);
collection.add_trigger(trigger)
}
}

374
src/server/http.rs Executable file
View File

@ -0,0 +1,374 @@
// src/server/http.rs
//! HTTP/HTTPS сервер с wait-free обработкой запросов
#![allow(dead_code)]
#![allow(unused_variables)]
use std::sync::Arc;
use hyper::{Body, Request, Response, Server, StatusCode};
use hyper::service::{make_service_fn, service_fn};
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use crate::common::Result;
use crate::server::database::Database;
/// Конфигурация статических файлов
#[derive(Clone)]
pub struct StaticFilesConfig {
pub enabled: bool,
pub directory: String,
}
impl Default for StaticFilesConfig {
fn default() -> Self {
Self {
enabled: true,
directory: "static".to_string(),
}
}
}
/// Конфигурация TLS
#[derive(Clone)]
pub struct TlsConfig {
pub enabled: bool,
pub cert_path: String,
pub key_path: String,
}
impl Default for TlsConfig {
fn default() -> Self {
Self {
enabled: true,
cert_path: "certs/cert.pem".to_string(),
key_path: "certs/key.pem".to_string(),
}
}
}
/// Конфигурация HTTP сервера
#[derive(Clone)]
pub struct HttpConfig {
pub enabled: bool,
pub port: u16,
pub http2_enabled: bool,
}
/// Конфигурация ACL
#[derive(Clone)]
pub struct AclConfig {
pub enabled: bool,
pub allowed_ips: Vec<String>,
pub denied_ips: Vec<String>,
}
impl Default for AclConfig {
fn default() -> Self {
Self {
enabled: false,
allowed_ips: vec!["127.0.0.1".to_string(), "::1".to_string()],
denied_ips: vec![],
}
}
}
/// Wait-free обработчик HTTP запросов с поддержкой ACL
async fn handle_request(
req: Request<Body>,
db: Arc<Database>,
static_config: StaticFilesConfig,
acl_config: AclConfig,
) -> Result<Response<Body>> {
// Проверка ACL, если включена
if acl_config.enabled {
if let Some(remote_addr) = req.extensions().get::<std::net::SocketAddr>() {
let ip = remote_addr.ip().to_string();
// Проверка запрещенных IP
if acl_config.denied_ips.contains(&ip) {
return Ok(Response::builder()
.status(StatusCode::FORBIDDEN)
.body(Body::from("Access denied"))
.unwrap());
}
// Проверка разрешенных IP (если список не пустой)
if !acl_config.allowed_ips.is_empty() && !acl_config.allowed_ips.contains(&ip) {
return Ok(Response::builder()
.status(StatusCode::FORBIDDEN)
.body(Body::from("Access denied"))
.unwrap());
}
}
}
let path = req.uri().path();
println!("HTTP Request: {} {}", req.method(), path);
// Обработка API запросов
if path.starts_with("/api/") {
handle_api_request(req, db).await
}
// Обслуживание статических файлов
else if static_config.enabled {
handle_static_file(path, static_config).await
}
// Корневой путь
else if path == "/" {
// ВОЗВРАЩАЕМ ПРОСТОЙ HTML ДЛЯ КОРНЕВОГО ПУТИ
Ok(Response::builder()
.header("Content-Type", "text/html; charset=utf-8")
.body(Body::from(r#"
<!DOCTYPE html>
<html>
<head>
<title>Futriix Database Server</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
h1 { color: #00bfff; }
</style>
</head>
<body>
<h1>Futriix Database Server</h1>
<p>Server is running successfully!</p>
<p>Try accessing <a href="/index.html">/index.html</a> for the main interface.</p>
</body>
</html>
"#))
.unwrap())
}
// 404 для остальных запросов
else {
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("Not Found"))
.unwrap())
}
}
/// Wait-free обработка API запросов
async fn handle_api_request(
_req: Request<Body>,
_db: Arc<Database>,
) -> Result<Response<Body>> {
// TODO: Реализовать wait-free обработку CRUD операций через HTTP
Ok(Response::builder()
.header("Content-Type", "application/json")
.body(Body::from(r#"{"status": "ok", "message": "Futriix Server is running"}"#))
.unwrap())
}
/// Wait-free обслуживание статических файлов
async fn handle_static_file(
path: &str,
config: StaticFilesConfig,
) -> Result<Response<Body>> {
// Убираем начальный слеш из пути
let clean_path = path.trim_start_matches('/');
// Если путь пустой или корневой, используем index.html
let file_path = if clean_path.is_empty() || clean_path == "/" {
format!("{}/index.html", config.directory)
} else {
format!("{}/{}", config.directory, clean_path)
};
// ДОБАВЛЯЕМ ДЕБАГ-ЛОГИРОВАНИЕ
println!("Trying to serve static file: {}", file_path);
match File::open(&file_path).await {
Ok(mut file) => {
let mut contents = Vec::new();
if let Err(e) = file.read_to_end(&mut contents).await {
eprintln!("Failed to read file {}: {}", file_path, e);
return Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from("Internal server error"))
.unwrap());
}
let content_type = get_content_type(&file_path);
println!("Successfully served static file: {} ({} bytes)", file_path, contents.len());
Ok(Response::builder()
.header("Content-Type", content_type)
.body(Body::from(contents))
.unwrap())
}
Err(e) => {
eprintln!("File not found: {} (error: {})", file_path, e);
// ДОБАВЛЯЕМ ПРОСТОЙ HTML ДЛЯ ТЕСТИРОВАНИЯ, ЕСЛИ ФАЙЛ НЕ НАЙДЕН
if clean_path == "index.html" {
let fallback_html = r#"
<!DOCTYPE html>
<html>
<head>
<title>Futriix Database Server</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
h1 { color: #00bfff; }
</style>
</head>
<body>
<h1>Futriix Database Server</h1>
<p>Welcome to Futriix Database Server!</p>
<p>Static file serving is working correctly.</p>
<p>Current time: PLACEHOLDER_TIME</p>
<p>Requested path: PLACEHOLDER_PATH</p>
</body>
</html>
"#.replace("PLACEHOLDER_TIME", &chrono::Local::now().to_rfc2822())
.replace("PLACEHOLDER_PATH", path);
return Ok(Response::builder()
.header("Content-Type", "text/html; charset=utf-8")
.body(Body::from(fallback_html))
.unwrap());
}
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("File not found"))
.unwrap())
}
}
}
/// Определение Content-Type по расширению файла
fn get_content_type(file_path: &str) -> &'static str {
if file_path.ends_with(".html") {
"text/html; charset=utf-8"
} else if file_path.ends_with(".css") {
"text/css; charset=utf-8"
} else if file_path.ends_with(".js") {
"application/javascript; charset=utf-8"
} else if file_path.ends_with(".png") {
"image/png"
} else if file_path.ends_with(".jpg") || file_path.ends_with(".jpeg") {
"image/jpeg"
} else if file_path.ends_with(".json") {
"application/json; charset=utf-8"
} else if file_path.ends_with(".ico") {
"image/x-icon"
} else if file_path.ends_with(".svg") {
"image/svg+xml"
} else {
"text/plain; charset=utf-8"
}
}
/// Запуск HTTP сервера с wait-free архитектурой
pub async fn start_http_server(
addr: &str,
db: Arc<Database>,
static_config: StaticFilesConfig,
http_config: HttpConfig,
acl_config: AclConfig,
) -> Result<()> {
let addr_parsed: std::net::SocketAddr = addr.parse()
.map_err(|e: std::net::AddrParseError| crate::common::FutriixError::HttpError(e.to_string()))?;
let db_clone = db.clone();
let static_clone = static_config.clone();
let acl_clone = acl_config.clone();
// Создание wait-free сервиса
let make_svc = make_service_fn(move |_conn| {
let db = db_clone.clone();
let static_config = static_clone.clone();
let acl_config = acl_clone.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |req| {
let db = db.clone();
let static_config = static_config.clone();
let acl_config = acl_config.clone();
async move {
handle_request(req, db, static_config, acl_config).await
}
}))
}
});
println!("HTTP server starting on {}...", addr);
// ЗАПУСКАЕМ СЕРВЕР И БЛОКИРУЕМСЯ НА ЕГО ВЫПОЛНЕНИИ
// Это гарантирует, что сервер продолжит работать
let server = Server::bind(&addr_parsed).serve(make_svc);
if let Err(e) = server.await {
eprintln!("HTTP server error: {}", e);
return Err(crate::common::FutriixError::HttpError(e.to_string()));
}
Ok(())
}
/// Запуск HTTPS сервера с wait-free архитектурой
pub async fn start_https_server(
addr: &str,
db: Arc<Database>,
static_config: StaticFilesConfig,
tls_config: TlsConfig,
acl_config: AclConfig,
) -> Result<()> {
use tokio::net::TcpListener;
if !tls_config.enabled {
println!("HTTPS disabled: TLS not enabled");
return Ok(());
}
// ПРОСТОЙ ВАРИАНТ БЕЗ TLS ДЛЯ ТЕСТИРОВАНИЯ
// В реальном коде здесь должна быть TLS конфигурация
println!("HTTPS server would start on {} (TLS configuration needed)", addr);
// ЗАПУСКАЕМ ОБЫЧНЫЙ HTTP СЕРВЕР НА HTTPS ПОРТУ ДЛЯ ТЕСТИРОВАНИЯ
// Но используем тот же порт, чтобы не путать
let http_config = HttpConfig {
enabled: true,
port: 8443, // Используем HTTPS порт для тестирования
http2_enabled: false,
};
// ИСПРАВЛЕНИЕ ОШИБКИ: создаем owned копии всех данных для использования в async move
let owned_addr = addr.to_string();
let owned_db = db.clone();
let owned_static_config = static_config.clone();
let owned_acl_config = acl_config.clone();
// Запускаем обычный HTTP сервер на HTTPS порту для тестирования
// Это временное решение до настройки TLS
let server_future = async move {
start_http_server(&owned_addr, owned_db, owned_static_config, http_config, owned_acl_config).await
};
// Запускаем сервер в отдельной задаче
tokio::spawn(async move {
if let Err(e) = server_future.await {
eprintln!("HTTPS (HTTP fallback) server error: {}", e);
}
});
Ok(())
}
// Вспомогательная функция для логирования
fn log_to_file(message: &str) {
use std::fs::OpenOptions;
use std::io::Write;
if let Ok(mut file) = OpenOptions::new()
.create(true)
.append(true)
.open("futriix.log")
{
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string();
let log_message = format!("[{}] {}\n", timestamp, message);
let _ = file.write_all(log_message.as_bytes());
}
}

142
src/server/lua_engine.rs Executable file
View File

@ -0,0 +1,142 @@
// src/server/lua_engine.rs
//! Встроенный интерпретатор Lua (на основе rlua)
use rlua::{Lua, RluaCompat};
use std::sync::Arc;
use std::fs;
use std::path::Path;
use crate::common::Result;
use crate::common::protocol;
use crate::server::database::{Trigger, TriggerEvent, Index, IndexType};
/// Движок Lua для выполнения скриптов
#[derive(Clone)]
pub struct LuaEngine {
lua: Arc<Lua>,
}
impl LuaEngine {
pub fn new() -> Result<Self> {
let lua = Lua::new();
// Настройка Lua окружения
lua.load(r#"
function futriix_log(message)
print("LUA: " .. message)
end
function futriix_error(message)
print("LUA ERROR: " .. message)
end
"#).exec()?;
Ok(Self { lua: Arc::new(lua) })
}
/// Выполнение Lua скрипта из файла
#[allow(dead_code)]
pub fn execute_script_file(&self, file_path: &str) -> Result<()> {
let script_content = fs::read_to_string(file_path)
.map_err(|e| crate::common::FutriixError::LuaError(format!("Failed to read script file {}: {}", file_path, e)))?;
self.execute_script(&script_content)
}
/// Выполнение Lua скрипта из строки
pub fn execute_script(&self, script: &str) -> Result<()> {
let lua = self.lua.clone();
lua.load(script).exec()?;
Ok(())
}
/// Выполнение всех скриптов из директории
#[allow(dead_code)]
pub fn execute_scripts_from_dir(&self, dir_path: &str, script_names: &[String]) -> Result<()> {
let path = Path::new(dir_path);
if !path.exists() {
println!("Lua scripts directory does not exist: {}", path.display());
return Ok(());
}
if !path.is_dir() {
return Err(crate::common::FutriixError::LuaError(
format!("Lua scripts path is not a directory: {}", path.display())
));
}
for script_name in script_names {
let script_path = path.join(script_name);
if script_path.exists() && script_path.is_file() {
println!("Executing Lua script: {}", script_path.display());
match self.execute_script_file(script_path.to_str().unwrap()) {
Ok(_) => println!("✓ Script executed successfully: {}", script_name),
Err(e) => eprintln!("✗ Failed to execute script {}: {}", script_name, e),
}
} else {
println!("Script not found: {}", script_path.display());
}
}
Ok(())
}
/// Регистрация функций базы данных в Lua
pub fn register_db_functions(
&self,
db: Arc<crate::server::database::Database>,
sharding_manager: Arc<crate::server::sharding::ShardingManager>
) -> Result<()> {
let lua = self.lua.clone();
// Создаем таблицу для функций БД
let futriix_db = lua.create_table()?;
// Базовые CRUD функции
let db_clone = db.clone();
futriix_db.set("create", lua.create_function(move |_, (collection, data): (String, String)| {
let command = protocol::Command::Create {
collection,
document: data.into_bytes(),
};
match db_clone.execute_command(command) {
Ok(_) => Ok(()),
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
let db_clone = db.clone();
futriix_db.set("read", lua.create_function(move |_, (collection, id): (String, String)| {
let command = protocol::Command::Read {
collection,
id,
};
match db_clone.execute_command(command) {
Ok(response) => {
match response {
protocol::Response::Success(data) => {
Ok(String::from_utf8_lossy(&data).to_string())
}
protocol::Response::Error(e) => {
Err(rlua::Error::RuntimeError(e))
}
}
}
Err(e) => Err(rlua::Error::RuntimeError(e.to_string())),
}
})?)?;
// Добавляем таблицу в глобальное пространство имен
lua.globals().set("futriix_db", futriix_db)?;
Ok(())
}
/// Получение директории Lua скриптов
fn lua_scripts_dir(&self) -> &'static str {
"lua_scripts"
}
}

158
src/server/main.rs Executable file
View File

@ -0,0 +1,158 @@
// src/main.rs
//! Главный модуль сервера Futriix
//!
//! Точка входа в приложение, инициализирует сервер и запускает его.
//! Использует wait-free архитектуру с lock-free структурами данных.
mod common;
mod server;
mod lua_shell;
use std::env;
use std::fs::OpenOptions;
use std::io::Write;
use crate::common::FutriixError;
/// Функция для логирования в файл
fn log_to_file(message: &str) {
match OpenOptions::new()
.create(true)
.append(true)
.open("futriix.log")
{
Ok(mut file) => {
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string();
let log_message = format!("[{}] {}\n", timestamp, message);
let _ = file.write_all(log_message.as_bytes());
}
Err(e) => eprintln!("Failed to write to log file: {}", e),
}
}
/// Простая структура для аргументов командной строки
struct Args {
config: String,
debug: bool,
http_port: Option<u16>,
https_port: Option<u16>,
host: Option<String>,
}
/// Простой парсер аргументов командной строки
fn parse_args() -> Args {
let mut args = Args {
config: "config.toml".to_string(),
debug: false,
http_port: None,
https_port: None,
host: None,
};
let mut iter = env::args().skip(1);
while let Some(arg) = iter.next() {
match arg.as_str() {
"--config" | "-c" => {
if let Some(value) = iter.next() {
args.config = value;
}
}
"--debug" | "-d" => {
args.debug = true;
}
"--http-port" => {
if let Some(value) = iter.next() {
if let Ok(port) = value.parse() {
args.http_port = Some(port);
}
}
}
"--https-port" => {
if let Some(value) = iter.next() {
if let Ok(port) = value.parse() {
args.https_port = Some(port);
}
}
}
"--host" => {
if let Some(value) = iter.next() {
args.host = Some(value);
}
}
_ => {
if arg.starts_with("--config=") {
args.config = arg.trim_start_matches("--config=").to_string();
} else if arg.starts_with("-c=") {
args.config = arg.trim_start_matches("-c=").to_string();
}
}
}
}
args
}
/// Функция для вывода текста с ANSI цветом
fn print_colored(text: &str, ansi_color: &str) {
println!("{}{}\x1b[0m", ansi_color, text);
}
/// Конвертация HEX цвета в ANSI escape code
fn hex_to_ansi(hex_color: &str) -> String {
let hex = hex_color.trim_start_matches('#');
if hex.len() == 6 {
if let (Ok(r), Ok(g), Ok(b)) = (
u8::from_str_radix(&hex[0..2], 16),
u8::from_str_radix(&hex[2..4], 16),
u8::from_str_radix(&hex[4..6], 16),
) {
return format!("\x1b[38;2;{};{};{}m", r, g, b);
}
}
"\x1b[38;2;255;255;255m".to_string()
}
#[tokio::main]
async fn main() -> Result<(), FutriixError> {
// Инициализация логирования в файл
log_to_file("Starting Futriix server");
// Вывод приветственного сообщения с цветом #00bfff перед загрузкой конфигурации
let color_code = hex_to_ansi("#00bfff");
println!(); // Добавляем пустую строку перед фразой
print_colored("Futriix Database Server", &color_code);
print_colored("futriix 3i²(by 26.11.2025)", &color_code);
println!(); // Добавляем пустую строку после фразы
// Парсим аргументы командной строки
let args = parse_args();
let config_path = args.config;
let message = format!("Loading configuration from: {}", config_path);
println!("{}", message);
log_to_file(&message);
// Создание и запуск сервера
match server::FutriixServer::new(&config_path).await {
Ok(server) => {
log_to_file("Server created successfully");
if let Err(e) = server.run().await {
let error_message = format!("Server error: {}", e);
eprintln!("{}", error_message);
log_to_file(&error_message);
std::process::exit(1);
}
}
Err(e) => {
let error_message = format!("Failed to create server: {}", e);
eprintln!("{}", error_message);
log_to_file(&error_message);
std::process::exit(1);
}
}
log_to_file("Futriix server stopped");
Ok(())
}

328
src/server/mod.rs Executable file
View File

@ -0,0 +1,328 @@
// src/server/mod.rs
//! Сервер Futriix - документо-ориентированная БД с wait-free архитектурой
//!
//! Основной модуль сервера, реализующий wait-free доступ к данным,
//! синхронную master-master репликацию и поддержку HTTP/HTTPS.
#![allow(dead_code)]
use std::sync::Arc;
use std::fs::OpenOptions;
use std::io::Write;
use crate::common::Result;
use crate::common::config::Config;
use crate::lua_shell::LuaShell;
// Импортируем подмодули
pub mod database;
pub mod lua_engine;
pub mod http;
pub mod sharding; // Объединенный модуль шардинга и репликации
pub mod csv_import_export; // Модуль для CSV импорта/экспорта
/// Функция для логирования в файл
fn log_to_file(message: &str) {
if let Ok(mut file) = OpenOptions::new()
.create(true)
.append(true)
.open("futriix.log")
{
let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string();
let log_message = format!("[{}] {}\n", timestamp, message);
let _ = file.write_all(log_message.as_bytes());
}
}
/// Функция для вывода текста с ANSI цветом
#[allow(dead_code)]
fn print_colored(text: &str, ansi_color: &str) {
println!("{}{}\x1b[0m", ansi_color, text);
}
/// Конвертация HEX цвета в ANSI escape code
fn hex_to_ansi(hex_color: &str) -> String {
let hex = hex_color.trim_start_matches('#');
if hex.len() == 6 {
if let (Ok(r), Ok(g), Ok(b)) = (
u8::from_str_radix(&hex[0..2], 16),
u8::from_str_radix(&hex[2..4], 16),
u8::from_str_radix(&hex[4..6], 16),
) {
return format!("\x1b[38;2;{};{};{}m", r, g, b);
}
}
"\x1b[38;2;255;255;255m".to_string()
}
/// Основный сервер Futriix с wait-free архитектураой
pub struct FutriixServer {
config: Config,
database: Arc<database::Database>,
lua_engine: lua_engine::LuaEngine,
sharding_manager: Arc<sharding::ShardingManager>, // Объединенный менеджер
http_enabled: bool,
csv_manager: Arc<csv_import_export::CsvManager>,
}
impl FutriixServer {
/// Создание нового сервера с wait-free архитектурой
pub async fn new(config_path: &str) -> Result<Self> {
// Загрузка конфигурации
let config = Config::load(config_path)?;
// Инициализация компонентов с wait-free подходами
let database = Arc::new(database::Database::new());
let lua_engine = lua_engine::LuaEngine::new()?;
// Инициализация объединенного менеджера шардинга и репликации
let sharding_manager = Arc::new(sharding::ShardingManager::new(
160, // virtual_nodes_per_node
config.replication.enabled,
));
// Инициализация менеджера CSV
let csv_manager = Arc::new(csv_import_export::CsvManager::new(
database.clone(),
config.csv.clone(),
));
// Регистрация функций БД в Lua
lua_engine.register_db_functions(database.clone(), sharding_manager.clone())?;
// Инициализация базы данных
FutriixServer::initialize_database(database.clone())?;
// Проверяем, включен ли HTTP режим (теперь учитываем новые директивы)
let http_enabled = (config.server.http_port.is_some() && config.server.http) ||
(config.server.https_port.is_some() && config.server.https);
Ok(Self {
config,
database,
lua_engine,
sharding_manager,
http_enabled,
csv_manager,
})
}
/// Инициализация базы данных с wait-free структурами
fn initialize_database(db: Arc<database::Database>) -> Result<()> {
// Создаем системные коллекции с wait-free доступом
let _system_collection = db.get_collection("_system");
let _users_collection = db.get_collection("_users");
let _logs_collection = db.get_collection("_logs");
let _procedures_collection = db.get_collection("_procedures");
let _triggers_collection = db.get_collection("_triggers");
let _csv_imports_collection = db.get_collection("_csv_imports");
// Создаем директорию для бэкапов
let backup_dir = "/futriix/backups";
if let Err(e) = std::fs::create_dir_all(backup_dir) {
// Используем текущую директорию как запасной вариант
let current_backup_dir = "./futriix_backups";
if let Err(e2) = std::fs::create_dir_all(current_backup_dir) {
eprintln!("Warning: Failed to create backup directory '{}': {}", backup_dir, e);
eprintln!("Warning: Also failed to create fallback directory '{}': {}", current_backup_dir, e2);
} else {
println!("Backup directory created at: {}", current_backup_dir);
}
} else {
println!("Backup directory created at: {}", backup_dir);
}
// Создаем директорию для CSV файлов
let csv_dir = "/futriix/csv";
if let Err(e) = std::fs::create_dir_all(csv_dir) {
// Используем текущую директорию как запасной вариант
let current_csv_dir = "./futriix_csv";
if let Err(e2) = std::fs::create_dir_all(current_csv_dir) {
eprintln!("Warning: Failed to create CSV directory '{}': {}", csv_dir, e);
eprintln!("Warning: Also failed to create fallback directory '{}': {}", current_csv_dir, e2);
} else {
println!("CSV directory created at: {}", current_csv_dir);
}
} else {
println!("CSV directory created at: {}", csv_dir);
}
// Создаем директорию для статических файлов
let static_dir = "static";
if let Err(e) = std::fs::create_dir_all(static_dir) {
eprintln!("Warning: Failed to create static files directory '{}': {}", static_dir, e);
} else {
println!("Static files directory created at: {}", static_dir);
}
// СОЗДАЕМ ПРОСТОЙ INDEX.HTML ДЛЯ ТЕСТИРОВАНИЯ
let index_html_content = r#"<!DOCTYPE html>
<html>
<head>
<title>Futriix Database Server</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
h1 { color: #00bfff; }
.status { padding: 10px; background: #f0f0f0; border-radius: 5px; }
</style>
</head>
<body>
<h1>Futriix Database Server</h1>
<div class="status">
<p>Server is running successfully!</p>
<p>This is a test page to verify HTTP server functionality.</p>
<p>Current time: <span id="time"></span></p>
</div>
<script>
document.getElementById('time').textContent = new Date().toLocaleString();
</script>
</body>
</html>"#;
if let Err(e) = std::fs::write("static/index.html", index_html_content) {
eprintln!("Warning: Failed to create index.html: {}", e);
} else {
println!("Created test index.html in static directory");
}
let message = "Database initialized with system collections";
println!("{}", message);
log_to_file(message);
Ok(())
}
/// Запуск сервера с wait-free архитектурой
pub async fn run(&self) -> Result<()> {
// Определяем режим работы и имя кластера
let cluster_name = &self.config.cluster.name;
println!("Mode: cluster (cluster: '{}')", cluster_name);
log_to_file("Futriix Database Server started");
log_to_file(&format!("Mode: cluster (cluster: '{}')", cluster_name));
// Запуск HTTP/HTTPS серверов в отдельных задачах, если настроены
if self.http_enabled {
// ЗАПУСКАЕМ СЕРВЕРЫ В ФОНОВЫХ ЗАДАЧАХ, НЕ БЛОКИРУЯ ОСНОВНОЙ ПОТОК
self.start_http_servers_in_background().await?;
} else {
println!("HTTP/HTTPS servers disabled in configuration");
}
// Добавляем пустую строку после информации о серверах
println!();
let mut lua_shell = LuaShell::new(
self.lua_engine.clone(),
self.database.clone(),
self.sharding_manager.clone(),
self.csv_manager.clone(),
);
// Запуск интерактивной оболочки - ЭТО ОСНОВНОЙ ПОТОК ВЫПОЛНЕНИЯ
lua_shell.run().await?;
Ok(())
}
/// Запуск HTTP/HTTPS серверов в фоновых задачах (не блокирующий)
async fn start_http_servers_in_background(&self) -> Result<()> {
let static_config = self::http::StaticFilesConfig::default();
let acl_config = self::http::AclConfig {
enabled: self.config.acl.enabled,
allowed_ips: self.config.acl.allowed_ips.clone(),
denied_ips: self.config.acl.denied_ips.clone(),
};
// Запуск HTTP сервера, если настроен и включен
if let Some(http_port) = self.config.server.http_port {
if self.config.server.http {
let http_addr = format!("{}:{}", self.config.server.host, http_port);
let http_config = self::http::HttpConfig {
enabled: true,
port: http_port,
http2_enabled: self.config.server.http2_enabled.unwrap_or(false),
};
let db_clone = self.database.clone();
let static_config_clone = static_config.clone();
let acl_config_clone = acl_config.clone();
// ЗАПУСКАЕМ В ФОНОВОЙ ЗАДАЧЕ БЕЗ ОЖИДАНИЯ
tokio::spawn(async move {
println!("Starting HTTP server on {}...", http_addr);
match self::http::start_http_server(&http_addr, db_clone, static_config_clone, http_config, acl_config_clone).await {
Ok(_) => {
let message = format!("HTTP server started on {}", http_addr);
println!("{}", message);
log_to_file(&message);
}
Err(e) => {
let message = format!("Failed to start HTTP server: {}", e);
eprintln!("{}", message);
log_to_file(&message);
}
}
});
} else {
println!("HTTP server disabled in configuration");
}
}
// Запуск HTTPS сервера, если настроен и включен
if let Some(https_port) = self.config.server.https_port {
if self.config.server.https && self.config.tls.enabled {
let https_addr = format!("{}:{}", self.config.server.host, https_port);
let tls_config = self::http::TlsConfig {
enabled: self.config.tls.enabled,
cert_path: self.config.tls.cert_path.clone(),
key_path: self.config.tls.key_path.clone(),
};
let db_clone = self.database.clone();
let static_config_clone = static_config.clone();
let acl_config_clone = acl_config.clone();
// ЗАПУСКАЕМ В ФОНОВОЙ ЗАДАЧЕ БЕЗ ОЖИДАНИЯ
tokio::spawn(async move {
println!("Starting HTTPS server on {}...", https_addr);
match self::http::start_https_server(&https_addr, db_clone, static_config_clone, tls_config, acl_config_clone).await {
Ok(_) => {
let message = format!("HTTPS server started on {}", https_addr);
println!("{}", message);
log_to_file(&message);
}
Err(e) => {
let message = format!("Failed to start HTTPS server: {}", e);
eprintln!("{}", message);
log_to_file(&message);
}
}
});
} else {
if !self.config.tls.enabled {
println!("HTTPS disabled: TLS not enabled in configuration");
} else {
println!("HTTPS server disabled in configuration");
}
}
}
Ok(())
}
/// Получение менеджера шардинга (для тестов и расширений)
#[allow(dead_code)]
pub fn get_sharding_manager(&self) -> Arc<sharding::ShardingManager> {
self.sharding_manager.clone()
}
/// Получение менеджера CSV (для тестов и расширений)
#[allow(dead_code)]
pub fn get_csv_manager(&self) -> Arc<csv_import_export::CsvManager> {
self.csv_manager.clone()
}
}

602
src/server/sharding.rs Executable file
View File

@ -0,0 +1,602 @@
// src/server/sharding.rs
//! Модуль шардинга с консистентным хэшированием и Raft протоколом
//!
//! Объединяет функционал шардинга и репликации с wait-free архитектурой
//! и реализацией Raft консенсуса для работы в production.
use std::collections::{HashMap, BTreeMap};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use tokio::sync::mpsc;
use tokio::time::{interval, Duration};
use tokio::io::AsyncWriteExt;
use serde::{Serialize, Deserialize};
use siphasher::sip::SipHasher13;
use dashmap::DashMap;
use crate::common::Result;
use crate::common::protocol;
/// Состояния узла в Raft протоколе
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum RaftState {
Follower,
Candidate,
Leader,
}
/// Информация о Raft узле
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RaftNode {
pub node_id: String,
pub address: String,
pub state: RaftState,
pub term: u64,
pub voted_for: Option<String>,
pub last_heartbeat: i64,
}
/// Информация о шард-узле с Raft
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShardNode {
pub node_id: String,
pub address: String,
pub capacity: u64,
pub used: u64,
pub collections: Vec<String>,
pub raft_info: RaftNode,
}
/// Состояние шардинга для коллекции
#[derive(Debug, Clone)]
pub struct CollectionSharding {
pub shard_key: String,
pub virtual_nodes: usize,
pub ring: BTreeMap<u64, String>, // consistent hash ring
}
/// События репликации
#[derive(Debug, Serialize, Deserialize)]
pub enum ReplicationEvent {
Command(protocol::Command),
SyncRequest,
Heartbeat,
RaftVoteRequest { term: u64, candidate_id: String },
RaftVoteResponse { term: u64, vote_granted: bool },
RaftAppendEntries { term: u64, leader_id: String },
}
/// Менеджер шардинга и репликации с Raft
#[derive(Clone)]
pub struct ShardingManager {
// Шардинг компоненты
nodes: Arc<DashMap<String, ShardNode>>, // Lock-free хранение узлов
collections: Arc<DashMap<String, CollectionSharding>>, // Lock-free хранение коллекций
virtual_nodes_per_node: usize,
// Raft компоненты
current_term: Arc<AtomicU64>, // Текущий терм Raft
voted_for: Arc<DashMap<u64, String>>, // Голоса за термы
is_leader: Arc<AtomicBool>, // Флаг лидера
cluster_formed: Arc<AtomicBool>, // Флаг сформированности кластера
// Репликация компоненты
replication_tx: Arc<mpsc::Sender<ReplicationEvent>>,
sequence_number: Arc<AtomicU64>,
replication_enabled: Arc<AtomicBool>,
}
impl ShardingManager {
/// Создание нового менеджера шардинга и репликации
pub fn new(virtual_nodes_per_node: usize, replication_enabled: bool) -> Self {
let (tx, rx) = mpsc::channel(1000); // Убрал ненужный mut
let manager = Self {
nodes: Arc::new(DashMap::new()),
collections: Arc::new(DashMap::new()),
virtual_nodes_per_node,
current_term: Arc::new(AtomicU64::new(0)),
voted_for: Arc::new(DashMap::new()),
is_leader: Arc::new(AtomicBool::new(false)),
cluster_formed: Arc::new(AtomicBool::new(false)),
replication_tx: Arc::new(tx),
sequence_number: Arc::new(AtomicU64::new(0)),
replication_enabled: Arc::new(AtomicBool::new(replication_enabled)),
};
// Запуск фоновой задачи обработки репликации и Raft
let manager_clone = manager.clone();
tokio::spawn(async move {
manager_clone.run_replication_loop(rx).await;
});
manager
}
/// Фоновая задача обработки репликации и Raft
async fn run_replication_loop(self, mut rx: mpsc::Receiver<ReplicationEvent>) {
let mut heartbeat_interval = interval(Duration::from_millis(1000));
loop {
tokio::select! {
Some(event) = rx.recv() => {
self.handle_replication_event(event).await;
}
_ = heartbeat_interval.tick() => {
if self.is_leader.load(Ordering::SeqCst) && self.replication_enabled.load(Ordering::SeqCst) {
let _ = self.send_heartbeat().await;
}
}
}
}
}
/// Обработка событий репликации
async fn handle_replication_event(&self, event: ReplicationEvent) {
if !self.replication_enabled.load(Ordering::SeqCst) {
return;
}
match event {
ReplicationEvent::Command(cmd) => {
self.replicate_command(cmd).await;
}
ReplicationEvent::SyncRequest => {
self.sync_with_nodes().await;
}
ReplicationEvent::Heartbeat => {
let _ = self.send_heartbeat().await;
}
ReplicationEvent::RaftVoteRequest { term, candidate_id } => {
self.handle_vote_request(term, candidate_id).await;
}
ReplicationEvent::RaftVoteResponse { term, vote_granted } => {
self.handle_vote_response(term, vote_granted).await;
}
ReplicationEvent::RaftAppendEntries { term, leader_id } => {
self.handle_append_entries(term, leader_id).await;
}
}
}
/// Репликация команды на другие узлы
async fn replicate_command(&self, command: protocol::Command) {
let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst);
for entry in self.nodes.iter() {
let node = entry.value();
if node.raft_info.state != RaftState::Leader {
let node_addr = node.address.clone();
let cmd_clone = command.clone();
let seq_clone = sequence;
tokio::spawn(async move {
if let Err(e) = Self::send_command_to_node(&node_addr, &cmd_clone, seq_clone).await {
eprintln!("Failed to replicate to {}: {}", node_addr, e);
}
});
}
}
}
/// Отправка команды на удаленный узел
async fn send_command_to_node(node: &str, command: &protocol::Command, sequence: u64) -> Result<()> {
let mut stream = match tokio::net::TcpStream::connect(node).await {
Ok(stream) => stream,
Err(e) => {
eprintln!("Failed to connect to {}: {}", node, e);
return Ok(());
}
};
let message = protocol::ReplicationMessage {
sequence,
command: command.clone(),
timestamp: chrono::Utc::now().timestamp(),
};
let bytes = protocol::serialize(&message)?;
if let Err(e) = stream.write_all(&bytes).await {
eprintln!("Failed to send command to {}: {}", node, e);
}
Ok(())
}
/// Синхронизация с другими узлами
async fn sync_with_nodes(&self) {
println!("Starting sync with {} nodes", self.nodes.len());
for entry in self.nodes.iter() {
let node_addr = entry.value().address.clone();
tokio::spawn(async move {
if let Err(e) = Self::sync_with_node(&node_addr).await {
eprintln!("Failed to sync with {}: {}", node_addr, e);
}
});
}
}
/// Синхронизация с удаленным узлом
async fn sync_with_node(_node: &str) -> Result<()> {
// TODO: Реализовать wait-free синхронизацию данных
Ok(())
}
/// Отправка heartbeat
async fn send_heartbeat(&self) -> Result<()> {
for entry in self.nodes.iter() {
let node = entry.value();
if node.raft_info.state == RaftState::Follower {
let node_addr = node.address.clone();
tokio::spawn(async move {
if let Err(e) = Self::send_heartbeat_to_node(&node_addr).await {
eprintln!("Heartbeat failed for {}: {}", node_addr, e);
}
});
}
}
Ok(())
}
/// Отправка heartbeat на удаленный узел
async fn send_heartbeat_to_node(node: &str) -> Result<()> {
let mut stream = match tokio::net::TcpStream::connect(node).await {
Ok(stream) => stream,
Err(e) => {
eprintln!("Failed to connect to {} for heartbeat: {}", node, e);
return Ok(());
}
};
let heartbeat = protocol::ReplicationMessage {
sequence: 0,
command: protocol::Command::CallProcedure { name: "heartbeat".to_string() },
timestamp: chrono::Utc::now().timestamp(),
};
let bytes = protocol::serialize(&heartbeat)?;
if let Err(e) = stream.write_all(&bytes).await {
eprintln!("Failed to send heartbeat to {}: {}", node, e);
}
Ok(())
}
// Raft методы
/// Обработка запроса голоса
async fn handle_vote_request(&self, term: u64, candidate_id: String) {
let current_term = self.current_term.load(Ordering::SeqCst);
if term > current_term {
self.current_term.store(term, Ordering::SeqCst);
self.voted_for.insert(term, candidate_id.clone());
// TODO: Отправить положительный ответ
}
// TODO: Отправить отрицательный ответ если условия не выполнены
}
/// Обработка ответа голоса
async fn handle_vote_response(&self, term: u64, vote_granted: bool) {
if vote_granted && term == self.current_term.load(Ordering::SeqCst) {
// TODO: Подсчитать голоса и перейти в лидеры при большинстве
}
}
/// Обработка AppendEntries RPC
async fn handle_append_entries(&self, term: u64, leader_id: String) {
let current_term = self.current_term.load(Ordering::SeqCst);
if term >= current_term {
self.current_term.store(term, Ordering::SeqCst);
self.is_leader.store(false, Ordering::SeqCst);
// TODO: Обновить состояние follower
}
}
// Шардинг методы
/// Добавление шард-узла с Raft информацией
pub fn add_node(&self, node_id: String, address: String, capacity: u64) -> Result<()> {
let raft_node = RaftNode {
node_id: node_id.clone(),
address: address.clone(),
state: RaftState::Follower,
term: 0,
voted_for: None,
last_heartbeat: chrono::Utc::now().timestamp(),
};
let node = ShardNode {
node_id: node_id.clone(),
address,
capacity,
used: 0,
collections: Vec::new(),
raft_info: raft_node,
};
self.nodes.insert(node_id, node);
// Проверяем сформированность кластера (минимум 2 узла для шардинга)
if self.nodes.len() >= 2 {
self.cluster_formed.store(true, Ordering::SeqCst);
println!("Cluster formed with {} nodes", self.nodes.len());
}
Ok(())
}
/// Удаление шард-узла
pub fn remove_node(&self, node_id: &str) -> Result<()> {
self.nodes.remove(node_id);
// Проверяем сформированность кластера после удаления
if self.nodes.len() < 2 {
self.cluster_formed.store(false, Ordering::SeqCst);
self.is_leader.store(false, Ordering::SeqCst);
}
Ok(())
}
/// Настройка шардинга для коллекции
pub fn setup_collection_sharding(&self, collection: &str, shard_key: &str) -> Result<()> {
// Проверка наличия кластера перед настройкой шардинга
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::DatabaseError(
"Cannot setup sharding: cluster not formed. Need at least 2 nodes.".to_string()
));
}
let sharding = CollectionSharding {
shard_key: shard_key.to_string(),
virtual_nodes: self.virtual_nodes_per_node,
ring: BTreeMap::new(),
};
self.collections.insert(collection.to_string(), sharding);
self.rebuild_ring(collection)?;
Ok(())
}
/// Перестроение хэш-ринга для коллекции
fn rebuild_ring(&self, collection: &str) -> Result<()> {
if let Some(mut sharding) = self.collections.get_mut(collection) {
sharding.ring.clear();
for entry in self.nodes.iter() {
let node_id = entry.key();
for i in 0..sharding.virtual_nodes {
let key = format!("{}-{}", node_id, i);
let hash = self.hash_key(&key);
sharding.ring.insert(hash, node_id.clone());
}
}
}
Ok(())
}
/// Хэширование ключа
fn hash_key(&self, key: &str) -> u64 {
let mut hasher = SipHasher13::new();
key.hash(&mut hasher);
hasher.finish()
}
/// Поиск узла для ключа
pub fn find_node_for_key(&self, collection: &str, key_value: &str) -> Result<Option<String>> {
// Проверка наличия кластера перед поиском узла
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::DatabaseError(
"Cannot find node: cluster not formed. Need at least 2 nodes.".to_string()
));
}
if let Some(sharding) = self.collections.get(collection) {
let key_hash = self.hash_key(key_value);
// Поиск в хэш-ринге (консистентное хэширование)
let mut range = sharding.ring.range(key_hash..);
if let Some((_, node_id)) = range.next() {
return Ok(Some(node_id.clone()));
}
// Если не найдено в верхней части ринга, берем первый узел
if let Some((_, node_id)) = sharding.ring.iter().next() {
return Ok(Some(node_id.clone()));
}
}
Ok(None)
}
/// Миграция шарда
pub fn migrate_shard(&self, collection: &str, from_node: &str, to_node: &str, shard_key: &str) -> Result<()> {
// Проверка наличия кластера перед миграцией
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::DatabaseError(
"Cannot migrate shard: cluster not formed. Need at least 2 nodes.".to_string()
));
}
println!("Migrating shard for collection '{}' from {} to {} with key {}",
collection, from_node, to_node, shard_key);
self.rebuild_ring(collection)?;
Ok(())
}
/// Ребалансировка кластера
pub fn rebalance_cluster(&self) -> Result<()> {
// Проверка наличия кластера перед ребалансировкой
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::DatabaseError(
"Cannot rebalance cluster: cluster not formed. Need at least 2 nodes.".to_string()
));
}
println!("Rebalancing cluster with {} nodes", self.nodes.len());
// Перестраиваем все хэш-ринги
for mut entry in self.collections.iter_mut() {
let sharding = entry.value_mut();
sharding.ring.clear();
for node_entry in self.nodes.iter() {
let node_id = node_entry.key();
for i in 0..sharding.virtual_nodes {
let key = format!("{}-{}", node_id, i);
let hash = self.hash_key(&key);
sharding.ring.insert(hash, node_id.clone());
}
}
}
// Ребалансировка узлов кластера
self.rebalance_nodes()?;
Ok(())
}
/// Ребалансировка узлов кластера
fn rebalance_nodes(&self) -> Result<()> {
println!("Rebalancing nodes in cluster...");
// Рассчитываем среднюю загрузку
let total_capacity: u64 = self.nodes.iter().map(|entry| entry.value().capacity).sum();
let total_used: u64 = self.nodes.iter().map(|entry| entry.value().used).sum();
let avg_usage = if total_capacity > 0 { total_used as f64 / total_capacity as f64 } else { 0.0 };
println!("Cluster usage: {:.2}% ({} / {})", avg_usage * 100.0, total_used, total_capacity);
// TODO: Реализовать алгоритм ребалансировки узлов
// - Определить перегруженные и недогруженные узлы
// - Перераспределить данные между узлами
// - Обновить метаданные шардинга
Ok(())
}
/// Получение статуса кластера
pub fn get_cluster_status(&self) -> Result<protocol::ClusterStatus> {
let mut cluster_nodes = Vec::new();
let mut total_capacity = 0;
let mut total_used = 0;
let mut raft_nodes = Vec::new();
for entry in self.nodes.iter() {
let node = entry.value();
total_capacity += node.capacity;
total_used += node.used;
cluster_nodes.push(protocol::ShardInfo {
node_id: node.node_id.clone(),
address: node.address.clone(),
capacity: node.capacity,
used: node.used,
collections: node.collections.clone(),
});
raft_nodes.push(protocol::RaftNodeInfo {
node_id: node.node_id.clone(),
address: node.address.clone(),
state: match node.raft_info.state {
RaftState::Leader => "leader".to_string(),
RaftState::Follower => "follower".to_string(),
RaftState::Candidate => "candidate".to_string(),
},
term: node.raft_info.term,
last_heartbeat: node.raft_info.last_heartbeat,
});
}
Ok(protocol::ClusterStatus {
nodes: cluster_nodes,
total_capacity,
total_used,
rebalance_needed: false,
cluster_formed: self.cluster_formed.load(Ordering::SeqCst),
leader_exists: self.is_leader.load(Ordering::SeqCst),
raft_nodes,
})
}
/// Получение списка Raft узлов
pub fn get_raft_nodes(&self) -> Vec<RaftNode> {
self.nodes.iter()
.map(|entry| entry.value().raft_info.clone())
.collect()
}
/// Проверка сформированности кластера
pub fn is_cluster_formed(&self) -> bool {
self.cluster_formed.load(Ordering::SeqCst)
}
/// Raft выборы - начало кампании
pub fn start_election(&self) -> Result<()> {
if !self.cluster_formed.load(Ordering::SeqCst) {
return Err(crate::common::FutriixError::DatabaseError(
"Cluster not formed. Need at least 2 nodes.".to_string()
));
}
let new_term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1;
println!("Starting election for term {}", new_term);
// Переход в состояние candidate
self.is_leader.store(false, Ordering::SeqCst);
// TODO: Реализовать полную логику Raft выборов
// - Отправка RequestVote RPC на другие узлы
// - Сбор голосов
// - Переход в состояние Leader при получении большинства
Ok(())
}
/// Отправка команды на репликацию
pub async fn replicate(&self, command: protocol::Command) -> Result<()> {
if !self.replication_enabled.load(Ordering::SeqCst) {
return Ok(());
}
self.replication_tx.send(ReplicationEvent::Command(command)).await
.map_err(|e| crate::common::FutriixError::ReplicationError(e.to_string()))
}
/// Запрос синхронизации с другими узлами
pub async fn request_sync(&self) -> Result<()> {
if !self.replication_enabled.load(Ordering::SeqCst) {
return Ok(());
}
self.replication_tx.send(ReplicationEvent::SyncRequest).await
.map_err(|e| crate::common::FutriixError::ReplicationError(e.to_string()))
}
/// Получение списка узлов репликации
pub fn get_nodes(&self) -> Vec<ShardNode> {
self.nodes.iter()
.map(|entry| entry.value().clone())
.collect()
}
/// Получение текущего номера последовательности
pub fn get_sequence_number(&self) -> u64 {
self.sequence_number.load(Ordering::SeqCst)
}
/// Проверка, включена ли репликация
pub fn is_replication_enabled(&self) -> bool {
self.replication_enabled.load(Ordering::SeqCst)
}
/// Получение информации об узле
pub fn get_node(&self, node_id: &str) -> Option<ShardNode> {
self.nodes.get(node_id).map(|entry| entry.value().clone())
}
}