// src/common/protocol.rs //! Протокол обмена данными для Futriix с wait-free архитектурой //! //! Этот модуль определяет структуры команд и ответов для взаимодействия между //! компонентами системы Futriix. Используется для сериализации и десериализации //! сообщений между клиентами и сервером, а также между узлами кластера. //! //! Основные компоненты: //! - Command: Перечисление всех поддерживаемых команд базы данных //! - Response: Перечисление возможных ответов от сервера //! - ReplicationMessage: Структура для репликации команд между узлами //! - Шардинг и кластерные структуры (ShardInfo, ClusterStatus, RaftNodeInfo) //! - Функции сериализации/десериализации с использованием MessagePack //! //! Особенности: //! - Wait-free сериализация с использованием MessagePack (rmp-serde) //! - Поддержка всех операций CRUD, транзакций, индексов и шардинга //! - Типизированные структуры для строгой проверки данных //! - Поддержка репликации с последовательными номерами и временными метками #![allow(dead_code)] // Разрешаем неиспользуемый код для будущего расширения use serde::{Deserialize, Serialize}; use crate::server::database::Index; // Тип индекса из модуля базы данных /// Команды для выполнения в базе данных Futriix /// Используются для передачи операций от клиентов к серверу и между узлами кластера. /// Каждая команда содержит все необходимые данные для выполнения соответствующей операции. /// Сериализуется в MessagePack для компактной передачи по сети. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Command { /// Создание нового документа в указанной коллекции /// /// # Поля /// * `collection` - Имя коллекции, в которую будет добавлен документ /// * `document` - Сериализованный документ в формате JSON (Vec) /// /// # Пример использования /// ```json /// { /// "collection": "users", /// "document": {"name": "Alice", "age": 30} /// } /// ``` Create { collection: String, document: Vec, }, /// Чтение документа по идентификатору из указанной коллекции /// /// # Поля /// * `collection` - Имя коллекции, из которой будет прочитан документ /// * `id` - Уникальный идентификатор документа (обычно UUID) /// /// # Пример использования /// ```json /// { /// "collection": "users", /// "id": "550e8400-e29b-41d4-a716-446655440000" /// } /// ``` Read { collection: String, id: String, }, /// Обновление существующего документа по идентификатору /// /// # Поля /// * `collection` - Имя коллекции, содержащей документ /// * `id` - Уникальный идентификатор обновляемого документа /// * `document` - Новая версия документа в формате JSON (Vec) /// /// # Пример использования /// ```json /// { /// "collection": "users", /// "id": "550e8400-e29b-41d4-a716-446655440000", /// "document": {"name": "Alice", "age": 31} /// } /// ``` Update { collection: String, id: String, document: Vec, }, /// Удаление документа по идентификатору из указанной коллекции /// /// # Поля /// * `collection` - Имя коллекции, из которой будет удален документ /// * `id` - Уникальный идентификатор удаляемого документа /// /// # Пример использования /// ```json /// { /// "collection": "users", /// "id": "550e8400-e29b-41d4-a716-446655440000" /// } /// ``` Delete { collection: String, id: String, }, /// Запрос документов из коллекции с применением фильтра /// /// # Поля /// * `collection` - Имя коллекции для поиска /// * `filter` - Сериализованный фильтр в формате JSON (Vec) /// Если пустой вектор - возвращаются все документы /// /// # Пример использования /// ```json /// { /// "collection": "users", /// "filter": {"age": {"$gt": 25}} /// } /// ``` Query { collection: String, filter: Vec, }, /// Создание хранимой процедуры (Lua скрипта) на сервере /// /// # Поля /// * `name` - Уникальное имя процедуры для последующего вызова /// * `code` - Lua код процедуры в виде вектора байтов /// /// # Пример использования /// ```json /// { /// "name": "calculate_stats", /// "code": "function calculate() return 42 end" /// } /// ``` CreateProcedure { name: String, code: Vec, }, /// Вызов ранее созданной хранимой процедуры /// /// # Поля /// * `name` - Имя процедуры для вызова /// /// # Пример использования /// ```json /// { /// "name": "calculate_stats" /// } /// ``` CallProcedure { name: String, }, /// Начало новой транзакции /// Создает контекст транзакции для атомарного выполнения группы команд /// /// # Поля /// * `transaction_id` - Уникальный идентификатор транзакции /// Используется для коммита или отката /// /// # Пример использования /// ```json /// { /// "transaction_id": "tx_user_update_001" /// } /// ``` BeginTransaction { transaction_id: String, }, /// Подтверждение (коммит) транзакции /// Выполняет все команды в транзакции атомарно /// /// # Поля /// * `transaction_id` - Идентификатор транзакции для коммита /// /// # Пример использования /// ```json /// { /// "transaction_id": "tx_user_update_001" /// } /// ``` CommitTransaction { transaction_id: String, }, /// Откат (rollback) транзакции /// Отменяет все изменения, сделанные в рамках транзакции /// /// # Поля /// * `transaction_id` - Идентификатор транзакции для отката /// /// # Пример использования /// ```json /// { /// "transaction_id": "tx_user_update_001" /// } /// ``` RollbackTransaction { transaction_id: String, }, /// Создание индекса для указанной коллекции /// Индекс ускоряет поиск документов по определенным полям /// /// # Поля /// * `collection` - Имя коллекции, для которой создается индекс /// * `index` - Структура индекса, определяющая параметры индексации /// /// # Пример использования /// ```json /// { /// "collection": "users", /// "index": { /// "name": "email_idx", /// "index_type": "Secondary", /// "field": "email", /// "unique": true /// } /// } /// ``` CreateIndex { collection: String, index: Index, }, /// Запрос документов по значению индекса /// Использует ранее созданный индекс для быстрого поиска /// /// # Поля /// * `collection` - Имя коллекции для поиска /// * `index_name` - Имя индекса для использования /// * `value` - Значение для поиска в индексе (сериализованное) /// /// # Пример использования /// ```json /// { /// "collection": "users", /// "index_name": "email_idx", /// "value": "alice@example.com" /// } /// ``` QueryByIndex { collection: String, index_name: String, value: Vec, }, /// Добавление нового узла в кластер шардинга /// Используется для горизонтального масштабирования системы /// /// # Поля /// * `node_id` - Уникальный идентификатор узла в кластере /// * `address` - Сетевой адрес узла (например, "127.0.0.1:8081") /// * `capacity` - Максимальная емкость узла в байтах /// /// # Пример использования /// ```json /// { /// "node_id": "node_1", /// "address": "127.0.0.1:8081", /// "capacity": 1073741824 /// } /// ``` AddShardNode { node_id: String, address: String, capacity: u64, }, /// Удаление узла из кластера шардинга /// Перед удалением рекомендуется мигрировать данные с узла /// /// # Поля /// * `node_id` - Идентификатор удаляемого узла /// /// # Пример использования /// ```json /// { /// "node_id": "node_1" /// } /// ``` RemoveShardNode { node_id: String, }, /// Миграция шарда (части данных) между узлами кластера /// Используется для ребалансировки нагрузки и обслуживания узлов /// /// # Поля /// * `collection` - Имя коллекции, данные которой мигрируются /// * `from_node` - Идентификатор исходного узла /// * `to_node` - Идентификатор целевого узла /// * `shard_key` - Ключ шарда для миграции (определяет диапазон данных) /// /// # Пример использования /// ```json /// { /// "collection": "users", /// "from_node": "node_1", /// "to_node": "node_2", /// "shard_key": "user_id" /// } /// ``` MigrateShard { collection: String, from_node: String, to_node: String, shard_key: String, }, /// Команда ребалансировки кластера /// Автоматически распределяет данные между узлами для оптимальной нагрузки /// Не требует параметров - работает на основе текущего состояния кластера /// /// # Пример использования /// ```json /// "RebalanceCluster" /// ``` RebalanceCluster, /// Получение текущего статуса кластера /// Возвращает информацию о всех узлах, их загрузке и состоянии /// Не требует параметров /// /// # Пример использования /// ```json /// "GetClusterStatus" /// ``` GetClusterStatus, /// Добавление ограничения (constraint) к коллекции /// Ограничения обеспечивают целостность данных (уникальность, проверки и т.д.) /// /// # Поля /// * `collection` - Имя коллекции для добавления ограничения /// * `constraint_name` - Уникальное имя ограничения /// * `constraint_type` - Тип ограничения ("unique", "not_null", "check" и т.д.) /// * `field` - Поле, к которому применяется ограничение /// * `value` - Значение ограничения (зависит от типа) /// /// # Пример использования /// ```json /// { /// "collection": "users", /// "constraint_name": "unique_email", /// "constraint_type": "unique", /// "field": "email", /// "value": "" /// } /// ``` AddConstraint { collection: String, constraint_name: String, constraint_type: String, field: String, value: Vec, }, /// Удаление ограничения из коллекции /// /// # Поля /// * `collection` - Имя коллекции /// * `constraint_name` - Имя удаляемого ограничения /// /// # Пример использования /// ```json /// { /// "collection": "users", /// "constraint_name": "unique_email" /// } /// ``` RemoveConstraint { collection: String, constraint_name: String, }, /// Включение сжатия данных для указанной коллекции /// Уменьшает объем хранимых данных за счет CPU ресурсов /// /// # Поля /// * `collection` - Имя коллекции /// * `algorithm` - Алгоритм сжатия ("gzip", "lz4", "zstd" и т.д.) /// /// # Пример использования /// ```json /// { /// "collection": "logs", /// "algorithm": "gzip" /// } /// ``` EnableCompression { collection: String, algorithm: String, }, /// Отключение сжатия данных для указанной коллекции /// /// # Поля /// * `collection` - Имя коллекции /// /// # Пример использования /// ```json /// { /// "collection": "logs" /// } /// ``` DisableCompression { collection: String, }, /// Создание глобального индекса (распределенного по всему кластеру) /// Позволяет выполнять быстрый поиск по всем узлам кластера /// /// # Поля /// * `name` - Уникальное имя глобального индекса /// * `field` - Поле для индексации /// * `unique` - Флаг уникальности индекса /// /// # Пример использования /// ```json /// { /// "name": "global_user_email", /// "field": "email", /// "unique": true /// } /// ``` CreateGlobalIndex { name: String, field: String, unique: bool, }, /// Запрос по глобальному индексу /// Выполняет поиск по всем узлам кластера с использованием глобального индекса /// /// # Поля /// * `index_name` - Имя глобального индекса /// * `value` - Значение для поиска (сериализованное) /// /// # Пример использования /// ```json /// { /// "index_name": "global_user_email", /// "value": "alice@example.com" /// } /// ``` QueryGlobalIndex { index_name: String, value: Vec, }, } /// Ответы от базы данных Futriix /// Используются для возврата результатов выполнения команд клиентам. /// Каждый ответ содержит либо успешный результат с данными, либо описание ошибки. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Response { /// Успешное выполнение команды с возвращаемыми данными /// /// # Параметры /// * `Vec` - Сериализованные данные результата /// Формат зависит от выполненной команды: /// - Для Create: ID созданного документа (String) /// - Для Read: документ в формате JSON /// - Для Query: массив документов в формате JSON /// - Для QueryByIndex: массив ID документов /// /// # Пример /// ```json /// {"Success": "550e8400-e29b-41d4-a716-446655440000"} /// ``` Success(Vec), /// Ошибка выполнения команды с описанием проблемы /// /// # Параметры /// * `String` - Человеко-читаемое описание ошибки /// /// # Пример /// ```json /// {"Error": "Document not found: 550e8400-e29b-41d4-a716-446655440000"} /// ``` Error(String), } /// Сообщение для репликации команд между узлами кластера /// Обеспечивает согласованное состояние данных на всех узлах. /// Каждое сообщение имеет последовательный номер для гарантии порядка доставки. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ReplicationMessage { /// Последовательный номер сообщения /// Увеличивается монотонно для каждого нового сообщения /// Используется для: /// 1. Гарантии порядка применения команд на репликах /// 2. Обнаружения пропущенных сообщений (gap detection) /// 3. Идемпотентной обработки (дублирование сообщений с тем же sequence игнорируется) pub sequence: u64, /// Команда для репликации /// Содержит операцию, которую нужно выполнить на всех репликах /// Все команды из перечисления Command могут быть реплицированы pub command: Command, /// Временная метка создания сообщения /// Используется для: /// 1. Отслеживания задержек репликации /// 2. Разрешения конфликтов (последняя запись побеждает) /// 3. Аудита и отладки /// Формат: Unix timestamp в секундах pub timestamp: i64, } /// Структура для информации о шарде (части данных на узле) /// Используется для мониторинга состояния распределенной системы /// и принятия решений о балансировке нагрузки. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ShardInfo { /// Уникальный идентификатор узла в кластере /// Формат: строка, обычно "node_" + уникальный суффикс /// Пример: "node_1", "node_abc123" pub node_id: String, /// Сетевой адрес узла /// Формат: "IP:PORT" или "hostname:PORT" /// Пример: "127.0.0.1:8081", "db-node-1.example.com:8081" pub address: String, /// Общая емкость узла в байтах /// Максимальный объем данных, который может хранить узел /// Пример: 1073741824 (1GB), 5368709120 (5GB) pub capacity: u64, /// Используемая емкость узла в байтах /// Текущий объем данных, хранящихся на узле /// Всегда меньше или равен capacity pub used: u64, /// Список коллекций, хранящихся на узле /// Каждый узел может хранить несколько коллекций или части коллекций /// Пример: ["users", "orders", "logs"] pub collections: Vec, } /// Структура для статуса кластера /// Содержит полную информацию о состоянии распределенной системы. /// Используется для мониторинга, принятия решений о балансировке /// и отображения состояния администраторам. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ClusterStatus { /// Список всех узлов в кластере с их текущим состоянием /// Каждый элемент содержит подробную информацию об узле pub nodes: Vec, /// Общая емкость кластера в байтах /// Сумма capacity всех узлов pub total_capacity: u64, /// Общая используемая емкость кластера в байтах /// Сумма used всех узлов pub total_used: u64, /// Флаг необходимости ребалансировки кластера /// true: распределение данных между узлами неравномерно /// false: данные распределены оптимально /// Определяется на основе разницы в использовании емкости между узлами pub rebalance_needed: bool, } /// Wait-Free сериализация сообщений с использованием MessagePack /// Преобразует структуры данных в компактный бинарный формат для передачи по сети. /// /// # Типовые параметры /// * `T` - Тип сериализуемого значения, должен реализовывать `serde::Serialize` /// /// # Аргументы /// * `value` - Ссылка на значение для сериализации /// /// # Возвращаемое значение /// * `crate::common::error::Result>` - Сериализованные данные или ошибка /// /// # Пример использования /// ```rust /// let command = Command::Create { /// collection: "users".to_string(), /// document: b"{\"name\":\"Alice\"}".to_vec(), /// }; /// /// let serialized = serialize(&command)?; /// // Отправка serialized по сети... /// ``` pub fn serialize(value: &T) -> crate::common::error::Result> { // Используем rmp_serde для сериализации в MessagePack // MessagePack выбран из-за: // 1. Компактности (меньше JSON) // 2. Быстроты сериализации/десериализации // 3. Поддержки бинарных данных без кодирования rmp_serde::to_vec(value) .map_err(|e| crate::common::error::FalcotError::SerializationError(e.to_string())) } /// Wait-Free десериализация сообщений с использованием MessagePack /// Преобразует бинарные данные обратно в структуры Rust. /// Обратная операция для функции `serialize`. /// /// # Типовые параметры /// * `T` - Тип десериализуемого значения, должен реализовывать `serde::Deserialize<'a>` /// /// # Аргументы /// * `bytes` - Ссылка на байты для десериализации /// /// # Возвращаемое значение /// * `crate::common::error::Result` - Десериализованное значение или ошибка /// /// # Пример использования /// ```rust /// // Получение bytes из сети... /// let command: Command = deserialize(&bytes)?; /// // Использование command... /// ``` pub fn deserialize<'a, T: serde::Deserialize<'a>>(bytes: &'a [u8]) -> crate::common::error::Result { // Используем rmp_serde для десериализации из MessagePack // Обрабатываем ошибки парсинга и несоответствия типов rmp_serde::from_slice(bytes) .map_err(|e| crate::common::error::FalcotError::SerializationError(e.to_string())) } // Дополнительные структуры для расширения функциональности (закомментированы для будущего использования) /* /// Структура для информации о репликационном лаге /// Используется для мониторинга отставания реплик от мастера #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ReplicationLag { pub node_id: String, pub lag_seconds: u64, pub last_applied_sequence: u64, } /// Структура для статистики выполнения команд /// Используется для мониторинга производительности и выявления узких мест #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CommandStats { pub command_type: String, pub count: u64, pub avg_duration_ms: f64, pub error_rate: f64, } /// Структура для информации о здоровье узла /// Используется для проверки доступности и состояния узлов кластера #[derive(Debug, Clone, Serialize, Deserialize)] pub struct HealthCheck { pub node_id: String, pub status: String, // "healthy", "degraded", "unhealthy" pub last_check: i64, pub details: String, } */ // Примечания по использованию протокола: // // 1. Сериализация: // - Все структуры автоматически сериализуются с помощью derive(Serialize) // - Используется MessagePack для максимальной эффективности // - Бинарный формат обеспечивает компактность и скорость // // 2. Обработка ошибок: // - Все ошибки сериализации/десериализации преобразуются в FalcotError // - Клиенты должны обрабатывать возможные ошибки сети и парсинга // // 3. Совместимость: // - Добавление новых вариантов Command обратно совместимо // - Удаление или изменение существующих вариантов требует миграции // - Все изменения в структурах должны тестироваться на совместимость // // 4. Производительность: // - MessagePack быстрее JSON для сериализации/десериализации // - Использование Vec для документов избегает лишних копирований // - Wait-free операции не блокируют потоки выполнения // // 5. Безопасность: // - Все входящие данные должны валидироваться перед использованием // - Ограничение максимального размера сообщений для предотвращения DoS // - Аутентификация и авторизация на более высоком уровне