Upload files to "/"

This commit is contained in:
Григорий Сафронов 2025-12-08 15:21:43 +00:00
parent 99a0faef62
commit 7b9c70a36f

690
protocol.rs Normal file
View File

@ -0,0 +1,690 @@
// src/common/protocol.rs
//! Протокол обмена данными для Futriix с wait-free архитектурой
//!
//! Этот модуль определяет структуры команд и ответов для взаимодействия между
//! компонентами системы Futriix. Используется для сериализации и десериализации
//! сообщений между клиентами и сервером, а также между узлами кластера.
//!
//! Основные компоненты:
//! - Command: Перечисление всех поддерживаемых команд базы данных
//! - Response: Перечисление возможных ответов от сервера
//! - ReplicationMessage: Структура для репликации команд между узлами
//! - Шардинг и кластерные структуры (ShardInfo, ClusterStatus, RaftNodeInfo)
//! - Функции сериализации/десериализации с использованием MessagePack
//!
//! Особенности:
//! - Wait-free сериализация с использованием MessagePack (rmp-serde)
//! - Поддержка всех операций CRUD, транзакций, индексов и шардинга
//! - Типизированные структуры для строгой проверки данных
//! - Поддержка репликации с последовательными номерами и временными метками
#![allow(dead_code)] // Разрешаем неиспользуемый код для будущего расширения
use serde::{Deserialize, Serialize};
use crate::server::database::Index; // Тип индекса из модуля базы данных
/// Команды для выполнения в базе данных Futriix
/// Используются для передачи операций от клиентов к серверу и между узлами кластера.
/// Каждая команда содержит все необходимые данные для выполнения соответствующей операции.
/// Сериализуется в MessagePack для компактной передачи по сети.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Command {
/// Создание нового документа в указанной коллекции
///
/// # Поля
/// * `collection` - Имя коллекции, в которую будет добавлен документ
/// * `document` - Сериализованный документ в формате JSON (Vec<u8>)
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "document": {"name": "Alice", "age": 30}
/// }
/// ```
Create {
collection: String,
document: Vec<u8>,
},
/// Чтение документа по идентификатору из указанной коллекции
///
/// # Поля
/// * `collection` - Имя коллекции, из которой будет прочитан документ
/// * `id` - Уникальный идентификатор документа (обычно UUID)
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "id": "550e8400-e29b-41d4-a716-446655440000"
/// }
/// ```
Read {
collection: String,
id: String,
},
/// Обновление существующего документа по идентификатору
///
/// # Поля
/// * `collection` - Имя коллекции, содержащей документ
/// * `id` - Уникальный идентификатор обновляемого документа
/// * `document` - Новая версия документа в формате JSON (Vec<u8>)
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "id": "550e8400-e29b-41d4-a716-446655440000",
/// "document": {"name": "Alice", "age": 31}
/// }
/// ```
Update {
collection: String,
id: String,
document: Vec<u8>,
},
/// Удаление документа по идентификатору из указанной коллекции
///
/// # Поля
/// * `collection` - Имя коллекции, из которой будет удален документ
/// * `id` - Уникальный идентификатор удаляемого документа
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "id": "550e8400-e29b-41d4-a716-446655440000"
/// }
/// ```
Delete {
collection: String,
id: String,
},
/// Запрос документов из коллекции с применением фильтра
///
/// # Поля
/// * `collection` - Имя коллекции для поиска
/// * `filter` - Сериализованный фильтр в формате JSON (Vec<u8>)
/// Если пустой вектор - возвращаются все документы
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "filter": {"age": {"$gt": 25}}
/// }
/// ```
Query {
collection: String,
filter: Vec<u8>,
},
/// Создание хранимой процедуры (Lua скрипта) на сервере
///
/// # Поля
/// * `name` - Уникальное имя процедуры для последующего вызова
/// * `code` - Lua код процедуры в виде вектора байтов
///
/// # Пример использования
/// ```json
/// {
/// "name": "calculate_stats",
/// "code": "function calculate() return 42 end"
/// }
/// ```
CreateProcedure {
name: String,
code: Vec<u8>,
},
/// Вызов ранее созданной хранимой процедуры
///
/// # Поля
/// * `name` - Имя процедуры для вызова
///
/// # Пример использования
/// ```json
/// {
/// "name": "calculate_stats"
/// }
/// ```
CallProcedure {
name: String,
},
/// Начало новой транзакции
/// Создает контекст транзакции для атомарного выполнения группы команд
///
/// # Поля
/// * `transaction_id` - Уникальный идентификатор транзакции
/// Используется для коммита или отката
///
/// # Пример использования
/// ```json
/// {
/// "transaction_id": "tx_user_update_001"
/// }
/// ```
BeginTransaction {
transaction_id: String,
},
/// Подтверждение (коммит) транзакции
/// Выполняет все команды в транзакции атомарно
///
/// # Поля
/// * `transaction_id` - Идентификатор транзакции для коммита
///
/// # Пример использования
/// ```json
/// {
/// "transaction_id": "tx_user_update_001"
/// }
/// ```
CommitTransaction {
transaction_id: String,
},
/// Откат (rollback) транзакции
/// Отменяет все изменения, сделанные в рамках транзакции
///
/// # Поля
/// * `transaction_id` - Идентификатор транзакции для отката
///
/// # Пример использования
/// ```json
/// {
/// "transaction_id": "tx_user_update_001"
/// }
/// ```
RollbackTransaction {
transaction_id: String,
},
/// Создание индекса для указанной коллекции
/// Индекс ускоряет поиск документов по определенным полям
///
/// # Поля
/// * `collection` - Имя коллекции, для которой создается индекс
/// * `index` - Структура индекса, определяющая параметры индексации
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "index": {
/// "name": "email_idx",
/// "index_type": "Secondary",
/// "field": "email",
/// "unique": true
/// }
/// }
/// ```
CreateIndex {
collection: String,
index: Index,
},
/// Запрос документов по значению индекса
/// Использует ранее созданный индекс для быстрого поиска
///
/// # Поля
/// * `collection` - Имя коллекции для поиска
/// * `index_name` - Имя индекса для использования
/// * `value` - Значение для поиска в индексе (сериализованное)
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "index_name": "email_idx",
/// "value": "alice@example.com"
/// }
/// ```
QueryByIndex {
collection: String,
index_name: String,
value: Vec<u8>,
},
/// Добавление нового узла в кластер шардинга
/// Используется для горизонтального масштабирования системы
///
/// # Поля
/// * `node_id` - Уникальный идентификатор узла в кластере
/// * `address` - Сетевой адрес узла (например, "127.0.0.1:8081")
/// * `capacity` - Максимальная емкость узла в байтах
///
/// # Пример использования
/// ```json
/// {
/// "node_id": "node_1",
/// "address": "127.0.0.1:8081",
/// "capacity": 1073741824
/// }
/// ```
AddShardNode {
node_id: String,
address: String,
capacity: u64,
},
/// Удаление узла из кластера шардинга
/// Перед удалением рекомендуется мигрировать данные с узла
///
/// # Поля
/// * `node_id` - Идентификатор удаляемого узла
///
/// # Пример использования
/// ```json
/// {
/// "node_id": "node_1"
/// }
/// ```
RemoveShardNode {
node_id: String,
},
/// Миграция шарда (части данных) между узлами кластера
/// Используется для ребалансировки нагрузки и обслуживания узлов
///
/// # Поля
/// * `collection` - Имя коллекции, данные которой мигрируются
/// * `from_node` - Идентификатор исходного узла
/// * `to_node` - Идентификатор целевого узла
/// * `shard_key` - Ключ шарда для миграции (определяет диапазон данных)
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "from_node": "node_1",
/// "to_node": "node_2",
/// "shard_key": "user_id"
/// }
/// ```
MigrateShard {
collection: String,
from_node: String,
to_node: String,
shard_key: String,
},
/// Команда ребалансировки кластера
/// Автоматически распределяет данные между узлами для оптимальной нагрузки
/// Не требует параметров - работает на основе текущего состояния кластера
///
/// # Пример использования
/// ```json
/// "RebalanceCluster"
/// ```
RebalanceCluster,
/// Получение текущего статуса кластера
/// Возвращает информацию о всех узлах, их загрузке и состоянии
/// Не требует параметров
///
/// # Пример использования
/// ```json
/// "GetClusterStatus"
/// ```
GetClusterStatus,
/// Добавление ограничения (constraint) к коллекции
/// Ограничения обеспечивают целостность данных (уникальность, проверки и т.д.)
///
/// # Поля
/// * `collection` - Имя коллекции для добавления ограничения
/// * `constraint_name` - Уникальное имя ограничения
/// * `constraint_type` - Тип ограничения ("unique", "not_null", "check" и т.д.)
/// * `field` - Поле, к которому применяется ограничение
/// * `value` - Значение ограничения (зависит от типа)
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "constraint_name": "unique_email",
/// "constraint_type": "unique",
/// "field": "email",
/// "value": ""
/// }
/// ```
AddConstraint {
collection: String,
constraint_name: String,
constraint_type: String,
field: String,
value: Vec<u8>,
},
/// Удаление ограничения из коллекции
///
/// # Поля
/// * `collection` - Имя коллекции
/// * `constraint_name` - Имя удаляемого ограничения
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "constraint_name": "unique_email"
/// }
/// ```
RemoveConstraint {
collection: String,
constraint_name: String,
},
/// Включение сжатия данных для указанной коллекции
/// Уменьшает объем хранимых данных за счет CPU ресурсов
///
/// # Поля
/// * `collection` - Имя коллекции
/// * `algorithm` - Алгоритм сжатия ("gzip", "lz4", "zstd" и т.д.)
///
/// # Пример использования
/// ```json
/// {
/// "collection": "logs",
/// "algorithm": "gzip"
/// }
/// ```
EnableCompression {
collection: String,
algorithm: String,
},
/// Отключение сжатия данных для указанной коллекции
///
/// # Поля
/// * `collection` - Имя коллекции
///
/// # Пример использования
/// ```json
/// {
/// "collection": "logs"
/// }
/// ```
DisableCompression {
collection: String,
},
/// Создание глобального индекса (распределенного по всему кластеру)
/// Позволяет выполнять быстрый поиск по всем узлам кластера
///
/// # Поля
/// * `name` - Уникальное имя глобального индекса
/// * `field` - Поле для индексации
/// * `unique` - Флаг уникальности индекса
///
/// # Пример использования
/// ```json
/// {
/// "name": "global_user_email",
/// "field": "email",
/// "unique": true
/// }
/// ```
CreateGlobalIndex {
name: String,
field: String,
unique: bool,
},
/// Запрос по глобальному индексу
/// Выполняет поиск по всем узлам кластера с использованием глобального индекса
///
/// # Поля
/// * `index_name` - Имя глобального индекса
/// * `value` - Значение для поиска (сериализованное)
///
/// # Пример использования
/// ```json
/// {
/// "index_name": "global_user_email",
/// "value": "alice@example.com"
/// }
/// ```
QueryGlobalIndex {
index_name: String,
value: Vec<u8>,
},
}
/// Ответы от базы данных Futriix
/// Используются для возврата результатов выполнения команд клиентам.
/// Каждый ответ содержит либо успешный результат с данными, либо описание ошибки.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Response {
/// Успешное выполнение команды с возвращаемыми данными
///
/// # Параметры
/// * `Vec<u8>` - Сериализованные данные результата
/// Формат зависит от выполненной команды:
/// - Для Create: ID созданного документа (String)
/// - Для Read: документ в формате JSON
/// - Для Query: массив документов в формате JSON
/// - Для QueryByIndex: массив ID документов
///
/// # Пример
/// ```json
/// {"Success": "550e8400-e29b-41d4-a716-446655440000"}
/// ```
Success(Vec<u8>),
/// Ошибка выполнения команды с описанием проблемы
///
/// # Параметры
/// * `String` - Человеко-читаемое описание ошибки
///
/// # Пример
/// ```json
/// {"Error": "Document not found: 550e8400-e29b-41d4-a716-446655440000"}
/// ```
Error(String),
}
/// Сообщение для репликации команд между узлами кластера
/// Обеспечивает согласованное состояние данных на всех узлах.
/// Каждое сообщение имеет последовательный номер для гарантии порядка доставки.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationMessage {
/// Последовательный номер сообщения
/// Увеличивается монотонно для каждого нового сообщения
/// Используется для:
/// 1. Гарантии порядка применения команд на репликах
/// 2. Обнаружения пропущенных сообщений (gap detection)
/// 3. Идемпотентной обработки (дублирование сообщений с тем же sequence игнорируется)
pub sequence: u64,
/// Команда для репликации
/// Содержит операцию, которую нужно выполнить на всех репликах
/// Все команды из перечисления Command могут быть реплицированы
pub command: Command,
/// Временная метка создания сообщения
/// Используется для:
/// 1. Отслеживания задержек репликации
/// 2. Разрешения конфликтов (последняя запись побеждает)
/// 3. Аудита и отладки
/// Формат: Unix timestamp в секундах
pub timestamp: i64,
}
/// Структура для информации о шарде (части данных на узле)
/// Используется для мониторинга состояния распределенной системы
/// и принятия решений о балансировке нагрузки.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShardInfo {
/// Уникальный идентификатор узла в кластере
/// Формат: строка, обычно "node_" + уникальный суффикс
/// Пример: "node_1", "node_abc123"
pub node_id: String,
/// Сетевой адрес узла
/// Формат: "IP:PORT" или "hostname:PORT"
/// Пример: "127.0.0.1:8081", "db-node-1.example.com:8081"
pub address: String,
/// Общая емкость узла в байтах
/// Максимальный объем данных, который может хранить узел
/// Пример: 1073741824 (1GB), 5368709120 (5GB)
pub capacity: u64,
/// Используемая емкость узла в байтах
/// Текущий объем данных, хранящихся на узле
/// Всегда меньше или равен capacity
pub used: u64,
/// Список коллекций, хранящихся на узле
/// Каждый узел может хранить несколько коллекций или части коллекций
/// Пример: ["users", "orders", "logs"]
pub collections: Vec<String>,
}
/// Структура для статуса кластера
/// Содержит полную информацию о состоянии распределенной системы.
/// Используется для мониторинга, принятия решений о балансировке
/// и отображения состояния администраторам.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterStatus {
/// Список всех узлов в кластере с их текущим состоянием
/// Каждый элемент содержит подробную информацию об узле
pub nodes: Vec<ShardInfo>,
/// Общая емкость кластера в байтах
/// Сумма capacity всех узлов
pub total_capacity: u64,
/// Общая используемая емкость кластера в байтах
/// Сумма used всех узлов
pub total_used: u64,
/// Флаг необходимости ребалансировки кластера
/// true: распределение данных между узлами неравномерно
/// false: данные распределены оптимально
/// Определяется на основе разницы в использовании емкости между узлами
pub rebalance_needed: bool,
}
/// Wait-Free сериализация сообщений с использованием MessagePack
/// Преобразует структуры данных в компактный бинарный формат для передачи по сети.
///
/// # Типовые параметры
/// * `T` - Тип сериализуемого значения, должен реализовывать `serde::Serialize`
///
/// # Аргументы
/// * `value` - Ссылка на значение для сериализации
///
/// # Возвращаемое значение
/// * `crate::common::error::Result<Vec<u8>>` - Сериализованные данные или ошибка
///
/// # Пример использования
/// ```rust
/// let command = Command::Create {
/// collection: "users".to_string(),
/// document: b"{\"name\":\"Alice\"}".to_vec(),
/// };
///
/// let serialized = serialize(&command)?;
/// // Отправка serialized по сети...
/// ```
pub fn serialize<T: serde::Serialize>(value: &T) -> crate::common::error::Result<Vec<u8>> {
// Используем rmp_serde для сериализации в MessagePack
// MessagePack выбран из-за:
// 1. Компактности (меньше JSON)
// 2. Быстроты сериализации/десериализации
// 3. Поддержки бинарных данных без кодирования
rmp_serde::to_vec(value)
.map_err(|e| crate::common::error::FalcotError::SerializationError(e.to_string()))
}
/// Wait-Free десериализация сообщений с использованием MessagePack
/// Преобразует бинарные данные обратно в структуры Rust.
/// Обратная операция для функции `serialize`.
///
/// # Типовые параметры
/// * `T` - Тип десериализуемого значения, должен реализовывать `serde::Deserialize<'a>`
///
/// # Аргументы
/// * `bytes` - Ссылка на байты для десериализации
///
/// # Возвращаемое значение
/// * `crate::common::error::Result<T>` - Десериализованное значение или ошибка
///
/// # Пример использования
/// ```rust
/// // Получение bytes из сети...
/// let command: Command = deserialize(&bytes)?;
/// // Использование command...
/// ```
pub fn deserialize<'a, T: serde::Deserialize<'a>>(bytes: &'a [u8]) -> crate::common::error::Result<T> {
// Используем rmp_serde для десериализации из MessagePack
// Обрабатываем ошибки парсинга и несоответствия типов
rmp_serde::from_slice(bytes)
.map_err(|e| crate::common::error::FalcotError::SerializationError(e.to_string()))
}
// Дополнительные структуры для расширения функциональности (закомментированы для будущего использования)
/*
/// Структура для информации о репликационном лаге
/// Используется для мониторинга отставания реплик от мастера
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationLag {
pub node_id: String,
pub lag_seconds: u64,
pub last_applied_sequence: u64,
}
/// Структура для статистики выполнения команд
/// Используется для мониторинга производительности и выявления узких мест
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommandStats {
pub command_type: String,
pub count: u64,
pub avg_duration_ms: f64,
pub error_rate: f64,
}
/// Структура для информации о здоровье узла
/// Используется для проверки доступности и состояния узлов кластера
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthCheck {
pub node_id: String,
pub status: String, // "healthy", "degraded", "unhealthy"
pub last_check: i64,
pub details: String,
}
*/
// Примечания по использованию протокола:
//
// 1. Сериализация:
// - Все структуры автоматически сериализуются с помощью derive(Serialize)
// - Используется MessagePack для максимальной эффективности
// - Бинарный формат обеспечивает компактность и скорость
//
// 2. Обработка ошибок:
// - Все ошибки сериализации/десериализации преобразуются в FalcotError
// - Клиенты должны обрабатывать возможные ошибки сети и парсинга
//
// 3. Совместимость:
// - Добавление новых вариантов Command обратно совместимо
// - Удаление или изменение существующих вариантов требует миграции
// - Все изменения в структурах должны тестироваться на совместимость
//
// 4. Производительность:
// - MessagePack быстрее JSON для сериализации/десериализации
// - Использование Vec<u8> для документов избегает лишних копирований
// - Wait-free операции не блокируют потоки выполнения
//
// 5. Безопасность:
// - Все входящие данные должны валидироваться перед использованием
// - Ограничение максимального размера сообщений для предотвращения DoS
// - Аутентификация и авторизация на более высоком уровне