diff --git a/protocol.rs b/protocol.rs new file mode 100644 index 0000000..4e4c227 --- /dev/null +++ b/protocol.rs @@ -0,0 +1,690 @@ +// src/common/protocol.rs +//! Протокол обмена данными для Futriix с wait-free архитектурой +//! +//! Этот модуль определяет структуры команд и ответов для взаимодействия между +//! компонентами системы Futriix. Используется для сериализации и десериализации +//! сообщений между клиентами и сервером, а также между узлами кластера. +//! +//! Основные компоненты: +//! - Command: Перечисление всех поддерживаемых команд базы данных +//! - Response: Перечисление возможных ответов от сервера +//! - ReplicationMessage: Структура для репликации команд между узлами +//! - Шардинг и кластерные структуры (ShardInfo, ClusterStatus, RaftNodeInfo) +//! - Функции сериализации/десериализации с использованием MessagePack +//! +//! Особенности: +//! - Wait-free сериализация с использованием MessagePack (rmp-serde) +//! - Поддержка всех операций CRUD, транзакций, индексов и шардинга +//! - Типизированные структуры для строгой проверки данных +//! - Поддержка репликации с последовательными номерами и временными метками + +#![allow(dead_code)] // Разрешаем неиспользуемый код для будущего расширения + +use serde::{Deserialize, Serialize}; +use crate::server::database::Index; // Тип индекса из модуля базы данных + +/// Команды для выполнения в базе данных Futriix +/// Используются для передачи операций от клиентов к серверу и между узлами кластера. +/// Каждая команда содержит все необходимые данные для выполнения соответствующей операции. +/// Сериализуется в MessagePack для компактной передачи по сети. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Command { + /// Создание нового документа в указанной коллекции + /// + /// # Поля + /// * `collection` - Имя коллекции, в которую будет добавлен документ + /// * `document` - Сериализованный документ в формате JSON (Vec) + /// + /// # Пример использования + /// ```json + /// { + /// "collection": "users", + /// "document": {"name": "Alice", "age": 30} + /// } + /// ``` + Create { + collection: String, + document: Vec, + }, + + /// Чтение документа по идентификатору из указанной коллекции + /// + /// # Поля + /// * `collection` - Имя коллекции, из которой будет прочитан документ + /// * `id` - Уникальный идентификатор документа (обычно UUID) + /// + /// # Пример использования + /// ```json + /// { + /// "collection": "users", + /// "id": "550e8400-e29b-41d4-a716-446655440000" + /// } + /// ``` + Read { + collection: String, + id: String, + }, + + /// Обновление существующего документа по идентификатору + /// + /// # Поля + /// * `collection` - Имя коллекции, содержащей документ + /// * `id` - Уникальный идентификатор обновляемого документа + /// * `document` - Новая версия документа в формате JSON (Vec) + /// + /// # Пример использования + /// ```json + /// { + /// "collection": "users", + /// "id": "550e8400-e29b-41d4-a716-446655440000", + /// "document": {"name": "Alice", "age": 31} + /// } + /// ``` + Update { + collection: String, + id: String, + document: Vec, + }, + + /// Удаление документа по идентификатору из указанной коллекции + /// + /// # Поля + /// * `collection` - Имя коллекции, из которой будет удален документ + /// * `id` - Уникальный идентификатор удаляемого документа + /// + /// # Пример использования + /// ```json + /// { + /// "collection": "users", + /// "id": "550e8400-e29b-41d4-a716-446655440000" + /// } + /// ``` + Delete { + collection: String, + id: String, + }, + + /// Запрос документов из коллекции с применением фильтра + /// + /// # Поля + /// * `collection` - Имя коллекции для поиска + /// * `filter` - Сериализованный фильтр в формате JSON (Vec) + /// Если пустой вектор - возвращаются все документы + /// + /// # Пример использования + /// ```json + /// { + /// "collection": "users", + /// "filter": {"age": {"$gt": 25}} + /// } + /// ``` + Query { + collection: String, + filter: Vec, + }, + + /// Создание хранимой процедуры (Lua скрипта) на сервере + /// + /// # Поля + /// * `name` - Уникальное имя процедуры для последующего вызова + /// * `code` - Lua код процедуры в виде вектора байтов + /// + /// # Пример использования + /// ```json + /// { + /// "name": "calculate_stats", + /// "code": "function calculate() return 42 end" + /// } + /// ``` + CreateProcedure { + name: String, + code: Vec, + }, + + /// Вызов ранее созданной хранимой процедуры + /// + /// # Поля + /// * `name` - Имя процедуры для вызова + /// + /// # Пример использования + /// ```json + /// { + /// "name": "calculate_stats" + /// } + /// ``` + CallProcedure { + name: String, + }, + + /// Начало новой транзакции + /// Создает контекст транзакции для атомарного выполнения группы команд + /// + /// # Поля + /// * `transaction_id` - Уникальный идентификатор транзакции + /// Используется для коммита или отката + /// + /// # Пример использования + /// ```json + /// { + /// "transaction_id": "tx_user_update_001" + /// } + /// ``` + BeginTransaction { + transaction_id: String, + }, + + /// Подтверждение (коммит) транзакции + /// Выполняет все команды в транзакции атомарно + /// + /// # Поля + /// * `transaction_id` - Идентификатор транзакции для коммита + /// + /// # Пример использования + /// ```json + /// { + /// "transaction_id": "tx_user_update_001" + /// } + /// ``` + CommitTransaction { + transaction_id: String, + }, + + /// Откат (rollback) транзакции + /// Отменяет все изменения, сделанные в рамках транзакции + /// + /// # Поля + /// * `transaction_id` - Идентификатор транзакции для отката + /// + /// # Пример использования + /// ```json + /// { + /// "transaction_id": "tx_user_update_001" + /// } + /// ``` + RollbackTransaction { + transaction_id: String, + }, + + /// Создание индекса для указанной коллекции + /// Индекс ускоряет поиск документов по определенным полям + /// + /// # Поля + /// * `collection` - Имя коллекции, для которой создается индекс + /// * `index` - Структура индекса, определяющая параметры индексации + /// + /// # Пример использования + /// ```json + /// { + /// "collection": "users", + /// "index": { + /// "name": "email_idx", + /// "index_type": "Secondary", + /// "field": "email", + /// "unique": true + /// } + /// } + /// ``` + CreateIndex { + collection: String, + index: Index, + }, + + /// Запрос документов по значению индекса + /// Использует ранее созданный индекс для быстрого поиска + /// + /// # Поля + /// * `collection` - Имя коллекции для поиска + /// * `index_name` - Имя индекса для использования + /// * `value` - Значение для поиска в индексе (сериализованное) + /// + /// # Пример использования + /// ```json + /// { + /// "collection": "users", + /// "index_name": "email_idx", + /// "value": "alice@example.com" + /// } + /// ``` + QueryByIndex { + collection: String, + index_name: String, + value: Vec, + }, + + /// Добавление нового узла в кластер шардинга + /// Используется для горизонтального масштабирования системы + /// + /// # Поля + /// * `node_id` - Уникальный идентификатор узла в кластере + /// * `address` - Сетевой адрес узла (например, "127.0.0.1:8081") + /// * `capacity` - Максимальная емкость узла в байтах + /// + /// # Пример использования + /// ```json + /// { + /// "node_id": "node_1", + /// "address": "127.0.0.1:8081", + /// "capacity": 1073741824 + /// } + /// ``` + AddShardNode { + node_id: String, + address: String, + capacity: u64, + }, + + /// Удаление узла из кластера шардинга + /// Перед удалением рекомендуется мигрировать данные с узла + /// + /// # Поля + /// * `node_id` - Идентификатор удаляемого узла + /// + /// # Пример использования + /// ```json + /// { + /// "node_id": "node_1" + /// } + /// ``` + RemoveShardNode { + node_id: String, + }, + + /// Миграция шарда (части данных) между узлами кластера + /// Используется для ребалансировки нагрузки и обслуживания узлов + /// + /// # Поля + /// * `collection` - Имя коллекции, данные которой мигрируются + /// * `from_node` - Идентификатор исходного узла + /// * `to_node` - Идентификатор целевого узла + /// * `shard_key` - Ключ шарда для миграции (определяет диапазон данных) + /// + /// # Пример использования + /// ```json + /// { + /// "collection": "users", + /// "from_node": "node_1", + /// "to_node": "node_2", + /// "shard_key": "user_id" + /// } + /// ``` + MigrateShard { + collection: String, + from_node: String, + to_node: String, + shard_key: String, + }, + + /// Команда ребалансировки кластера + /// Автоматически распределяет данные между узлами для оптимальной нагрузки + /// Не требует параметров - работает на основе текущего состояния кластера + /// + /// # Пример использования + /// ```json + /// "RebalanceCluster" + /// ``` + RebalanceCluster, + + /// Получение текущего статуса кластера + /// Возвращает информацию о всех узлах, их загрузке и состоянии + /// Не требует параметров + /// + /// # Пример использования + /// ```json + /// "GetClusterStatus" + /// ``` + GetClusterStatus, + + /// Добавление ограничения (constraint) к коллекции + /// Ограничения обеспечивают целостность данных (уникальность, проверки и т.д.) + /// + /// # Поля + /// * `collection` - Имя коллекции для добавления ограничения + /// * `constraint_name` - Уникальное имя ограничения + /// * `constraint_type` - Тип ограничения ("unique", "not_null", "check" и т.д.) + /// * `field` - Поле, к которому применяется ограничение + /// * `value` - Значение ограничения (зависит от типа) + /// + /// # Пример использования + /// ```json + /// { + /// "collection": "users", + /// "constraint_name": "unique_email", + /// "constraint_type": "unique", + /// "field": "email", + /// "value": "" + /// } + /// ``` + AddConstraint { + collection: String, + constraint_name: String, + constraint_type: String, + field: String, + value: Vec, + }, + + /// Удаление ограничения из коллекции + /// + /// # Поля + /// * `collection` - Имя коллекции + /// * `constraint_name` - Имя удаляемого ограничения + /// + /// # Пример использования + /// ```json + /// { + /// "collection": "users", + /// "constraint_name": "unique_email" + /// } + /// ``` + RemoveConstraint { + collection: String, + constraint_name: String, + }, + + /// Включение сжатия данных для указанной коллекции + /// Уменьшает объем хранимых данных за счет CPU ресурсов + /// + /// # Поля + /// * `collection` - Имя коллекции + /// * `algorithm` - Алгоритм сжатия ("gzip", "lz4", "zstd" и т.д.) + /// + /// # Пример использования + /// ```json + /// { + /// "collection": "logs", + /// "algorithm": "gzip" + /// } + /// ``` + EnableCompression { + collection: String, + algorithm: String, + }, + + /// Отключение сжатия данных для указанной коллекции + /// + /// # Поля + /// * `collection` - Имя коллекции + /// + /// # Пример использования + /// ```json + /// { + /// "collection": "logs" + /// } + /// ``` + DisableCompression { + collection: String, + }, + + /// Создание глобального индекса (распределенного по всему кластеру) + /// Позволяет выполнять быстрый поиск по всем узлам кластера + /// + /// # Поля + /// * `name` - Уникальное имя глобального индекса + /// * `field` - Поле для индексации + /// * `unique` - Флаг уникальности индекса + /// + /// # Пример использования + /// ```json + /// { + /// "name": "global_user_email", + /// "field": "email", + /// "unique": true + /// } + /// ``` + CreateGlobalIndex { + name: String, + field: String, + unique: bool, + }, + + /// Запрос по глобальному индексу + /// Выполняет поиск по всем узлам кластера с использованием глобального индекса + /// + /// # Поля + /// * `index_name` - Имя глобального индекса + /// * `value` - Значение для поиска (сериализованное) + /// + /// # Пример использования + /// ```json + /// { + /// "index_name": "global_user_email", + /// "value": "alice@example.com" + /// } + /// ``` + QueryGlobalIndex { + index_name: String, + value: Vec, + }, +} + +/// Ответы от базы данных Futriix +/// Используются для возврата результатов выполнения команд клиентам. +/// Каждый ответ содержит либо успешный результат с данными, либо описание ошибки. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Response { + /// Успешное выполнение команды с возвращаемыми данными + /// + /// # Параметры + /// * `Vec` - Сериализованные данные результата + /// Формат зависит от выполненной команды: + /// - Для Create: ID созданного документа (String) + /// - Для Read: документ в формате JSON + /// - Для Query: массив документов в формате JSON + /// - Для QueryByIndex: массив ID документов + /// + /// # Пример + /// ```json + /// {"Success": "550e8400-e29b-41d4-a716-446655440000"} + /// ``` + Success(Vec), + + /// Ошибка выполнения команды с описанием проблемы + /// + /// # Параметры + /// * `String` - Человеко-читаемое описание ошибки + /// + /// # Пример + /// ```json + /// {"Error": "Document not found: 550e8400-e29b-41d4-a716-446655440000"} + /// ``` + Error(String), +} + +/// Сообщение для репликации команд между узлами кластера +/// Обеспечивает согласованное состояние данных на всех узлах. +/// Каждое сообщение имеет последовательный номер для гарантии порядка доставки. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReplicationMessage { + /// Последовательный номер сообщения + /// Увеличивается монотонно для каждого нового сообщения + /// Используется для: + /// 1. Гарантии порядка применения команд на репликах + /// 2. Обнаружения пропущенных сообщений (gap detection) + /// 3. Идемпотентной обработки (дублирование сообщений с тем же sequence игнорируется) + pub sequence: u64, + + /// Команда для репликации + /// Содержит операцию, которую нужно выполнить на всех репликах + /// Все команды из перечисления Command могут быть реплицированы + pub command: Command, + + /// Временная метка создания сообщения + /// Используется для: + /// 1. Отслеживания задержек репликации + /// 2. Разрешения конфликтов (последняя запись побеждает) + /// 3. Аудита и отладки + /// Формат: Unix timestamp в секундах + pub timestamp: i64, +} + +/// Структура для информации о шарде (части данных на узле) +/// Используется для мониторинга состояния распределенной системы +/// и принятия решений о балансировке нагрузки. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ShardInfo { + /// Уникальный идентификатор узла в кластере + /// Формат: строка, обычно "node_" + уникальный суффикс + /// Пример: "node_1", "node_abc123" + pub node_id: String, + + /// Сетевой адрес узла + /// Формат: "IP:PORT" или "hostname:PORT" + /// Пример: "127.0.0.1:8081", "db-node-1.example.com:8081" + pub address: String, + + /// Общая емкость узла в байтах + /// Максимальный объем данных, который может хранить узел + /// Пример: 1073741824 (1GB), 5368709120 (5GB) + pub capacity: u64, + + /// Используемая емкость узла в байтах + /// Текущий объем данных, хранящихся на узле + /// Всегда меньше или равен capacity + pub used: u64, + + /// Список коллекций, хранящихся на узле + /// Каждый узел может хранить несколько коллекций или части коллекций + /// Пример: ["users", "orders", "logs"] + pub collections: Vec, +} + +/// Структура для статуса кластера +/// Содержит полную информацию о состоянии распределенной системы. +/// Используется для мониторинга, принятия решений о балансировке +/// и отображения состояния администраторам. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ClusterStatus { + /// Список всех узлов в кластере с их текущим состоянием + /// Каждый элемент содержит подробную информацию об узле + pub nodes: Vec, + + /// Общая емкость кластера в байтах + /// Сумма capacity всех узлов + pub total_capacity: u64, + + /// Общая используемая емкость кластера в байтах + /// Сумма used всех узлов + pub total_used: u64, + + /// Флаг необходимости ребалансировки кластера + /// true: распределение данных между узлами неравномерно + /// false: данные распределены оптимально + /// Определяется на основе разницы в использовании емкости между узлами + pub rebalance_needed: bool, +} + +/// Wait-Free сериализация сообщений с использованием MessagePack +/// Преобразует структуры данных в компактный бинарный формат для передачи по сети. +/// +/// # Типовые параметры +/// * `T` - Тип сериализуемого значения, должен реализовывать `serde::Serialize` +/// +/// # Аргументы +/// * `value` - Ссылка на значение для сериализации +/// +/// # Возвращаемое значение +/// * `crate::common::error::Result>` - Сериализованные данные или ошибка +/// +/// # Пример использования +/// ```rust +/// let command = Command::Create { +/// collection: "users".to_string(), +/// document: b"{\"name\":\"Alice\"}".to_vec(), +/// }; +/// +/// let serialized = serialize(&command)?; +/// // Отправка serialized по сети... +/// ``` +pub fn serialize(value: &T) -> crate::common::error::Result> { + // Используем rmp_serde для сериализации в MessagePack + // MessagePack выбран из-за: + // 1. Компактности (меньше JSON) + // 2. Быстроты сериализации/десериализации + // 3. Поддержки бинарных данных без кодирования + rmp_serde::to_vec(value) + .map_err(|e| crate::common::error::FalcotError::SerializationError(e.to_string())) +} + +/// Wait-Free десериализация сообщений с использованием MessagePack +/// Преобразует бинарные данные обратно в структуры Rust. +/// Обратная операция для функции `serialize`. +/// +/// # Типовые параметры +/// * `T` - Тип десериализуемого значения, должен реализовывать `serde::Deserialize<'a>` +/// +/// # Аргументы +/// * `bytes` - Ссылка на байты для десериализации +/// +/// # Возвращаемое значение +/// * `crate::common::error::Result` - Десериализованное значение или ошибка +/// +/// # Пример использования +/// ```rust +/// // Получение bytes из сети... +/// let command: Command = deserialize(&bytes)?; +/// // Использование command... +/// ``` +pub fn deserialize<'a, T: serde::Deserialize<'a>>(bytes: &'a [u8]) -> crate::common::error::Result { + // Используем rmp_serde для десериализации из MessagePack + // Обрабатываем ошибки парсинга и несоответствия типов + rmp_serde::from_slice(bytes) + .map_err(|e| crate::common::error::FalcotError::SerializationError(e.to_string())) +} + +// Дополнительные структуры для расширения функциональности (закомментированы для будущего использования) + +/* +/// Структура для информации о репликационном лаге +/// Используется для мониторинга отставания реплик от мастера +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReplicationLag { + pub node_id: String, + pub lag_seconds: u64, + pub last_applied_sequence: u64, +} + +/// Структура для статистики выполнения команд +/// Используется для мониторинга производительности и выявления узких мест +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommandStats { + pub command_type: String, + pub count: u64, + pub avg_duration_ms: f64, + pub error_rate: f64, +} + +/// Структура для информации о здоровье узла +/// Используется для проверки доступности и состояния узлов кластера +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HealthCheck { + pub node_id: String, + pub status: String, // "healthy", "degraded", "unhealthy" + pub last_check: i64, + pub details: String, +} +*/ + +// Примечания по использованию протокола: +// +// 1. Сериализация: +// - Все структуры автоматически сериализуются с помощью derive(Serialize) +// - Используется MessagePack для максимальной эффективности +// - Бинарный формат обеспечивает компактность и скорость +// +// 2. Обработка ошибок: +// - Все ошибки сериализации/десериализации преобразуются в FalcotError +// - Клиенты должны обрабатывать возможные ошибки сети и парсинга +// +// 3. Совместимость: +// - Добавление новых вариантов Command обратно совместимо +// - Удаление или изменение существующих вариантов требует миграции +// - Все изменения в структурах должны тестироваться на совместимость +// +// 4. Производительность: +// - MessagePack быстрее JSON для сериализации/десериализации +// - Использование Vec для документов избегает лишних копирований +// - Wait-free операции не блокируют потоки выполнения +// +// 5. Безопасность: +// - Все входящие данные должны валидироваться перед использованием +// - Ограничение максимального размера сообщений для предотвращения DoS +// - Аутентификация и авторизация на более высоком уровне