futriix/protocol.rs

691 lines
30 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// src/common/protocol.rs
//! Протокол обмена данными для Futriix с wait-free архитектурой
//!
//! Этот модуль определяет структуры команд и ответов для взаимодействия между
//! компонентами системы Futriix. Используется для сериализации и десериализации
//! сообщений между клиентами и сервером, а также между узлами кластера.
//!
//! Основные компоненты:
//! - Command: Перечисление всех поддерживаемых команд базы данных
//! - Response: Перечисление возможных ответов от сервера
//! - ReplicationMessage: Структура для репликации команд между узлами
//! - Шардинг и кластерные структуры (ShardInfo, ClusterStatus, RaftNodeInfo)
//! - Функции сериализации/десериализации с использованием MessagePack
//!
//! Особенности:
//! - Wait-free сериализация с использованием MessagePack (rmp-serde)
//! - Поддержка всех операций CRUD, транзакций, индексов и шардинга
//! - Типизированные структуры для строгой проверки данных
//! - Поддержка репликации с последовательными номерами и временными метками
#![allow(dead_code)] // Разрешаем неиспользуемый код для будущего расширения
use serde::{Deserialize, Serialize};
use crate::server::database::Index; // Тип индекса из модуля базы данных
/// Команды для выполнения в базе данных Futriix
/// Используются для передачи операций от клиентов к серверу и между узлами кластера.
/// Каждая команда содержит все необходимые данные для выполнения соответствующей операции.
/// Сериализуется в MessagePack для компактной передачи по сети.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Command {
/// Создание нового документа в указанной коллекции
///
/// # Поля
/// * `collection` - Имя коллекции, в которую будет добавлен документ
/// * `document` - Сериализованный документ в формате JSON (Vec<u8>)
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "document": {"name": "Alice", "age": 30}
/// }
/// ```
Create {
collection: String,
document: Vec<u8>,
},
/// Чтение документа по идентификатору из указанной коллекции
///
/// # Поля
/// * `collection` - Имя коллекции, из которой будет прочитан документ
/// * `id` - Уникальный идентификатор документа (обычно UUID)
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "id": "550e8400-e29b-41d4-a716-446655440000"
/// }
/// ```
Read {
collection: String,
id: String,
},
/// Обновление существующего документа по идентификатору
///
/// # Поля
/// * `collection` - Имя коллекции, содержащей документ
/// * `id` - Уникальный идентификатор обновляемого документа
/// * `document` - Новая версия документа в формате JSON (Vec<u8>)
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "id": "550e8400-e29b-41d4-a716-446655440000",
/// "document": {"name": "Alice", "age": 31}
/// }
/// ```
Update {
collection: String,
id: String,
document: Vec<u8>,
},
/// Удаление документа по идентификатору из указанной коллекции
///
/// # Поля
/// * `collection` - Имя коллекции, из которой будет удален документ
/// * `id` - Уникальный идентификатор удаляемого документа
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "id": "550e8400-e29b-41d4-a716-446655440000"
/// }
/// ```
Delete {
collection: String,
id: String,
},
/// Запрос документов из коллекции с применением фильтра
///
/// # Поля
/// * `collection` - Имя коллекции для поиска
/// * `filter` - Сериализованный фильтр в формате JSON (Vec<u8>)
/// Если пустой вектор - возвращаются все документы
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "filter": {"age": {"$gt": 25}}
/// }
/// ```
Query {
collection: String,
filter: Vec<u8>,
},
/// Создание хранимой процедуры (Lua скрипта) на сервере
///
/// # Поля
/// * `name` - Уникальное имя процедуры для последующего вызова
/// * `code` - Lua код процедуры в виде вектора байтов
///
/// # Пример использования
/// ```json
/// {
/// "name": "calculate_stats",
/// "code": "function calculate() return 42 end"
/// }
/// ```
CreateProcedure {
name: String,
code: Vec<u8>,
},
/// Вызов ранее созданной хранимой процедуры
///
/// # Поля
/// * `name` - Имя процедуры для вызова
///
/// # Пример использования
/// ```json
/// {
/// "name": "calculate_stats"
/// }
/// ```
CallProcedure {
name: String,
},
/// Начало новой транзакции
/// Создает контекст транзакции для атомарного выполнения группы команд
///
/// # Поля
/// * `transaction_id` - Уникальный идентификатор транзакции
/// Используется для коммита или отката
///
/// # Пример использования
/// ```json
/// {
/// "transaction_id": "tx_user_update_001"
/// }
/// ```
BeginTransaction {
transaction_id: String,
},
/// Подтверждение (коммит) транзакции
/// Выполняет все команды в транзакции атомарно
///
/// # Поля
/// * `transaction_id` - Идентификатор транзакции для коммита
///
/// # Пример использования
/// ```json
/// {
/// "transaction_id": "tx_user_update_001"
/// }
/// ```
CommitTransaction {
transaction_id: String,
},
/// Откат (rollback) транзакции
/// Отменяет все изменения, сделанные в рамках транзакции
///
/// # Поля
/// * `transaction_id` - Идентификатор транзакции для отката
///
/// # Пример использования
/// ```json
/// {
/// "transaction_id": "tx_user_update_001"
/// }
/// ```
RollbackTransaction {
transaction_id: String,
},
/// Создание индекса для указанной коллекции
/// Индекс ускоряет поиск документов по определенным полям
///
/// # Поля
/// * `collection` - Имя коллекции, для которой создается индекс
/// * `index` - Структура индекса, определяющая параметры индексации
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "index": {
/// "name": "email_idx",
/// "index_type": "Secondary",
/// "field": "email",
/// "unique": true
/// }
/// }
/// ```
CreateIndex {
collection: String,
index: Index,
},
/// Запрос документов по значению индекса
/// Использует ранее созданный индекс для быстрого поиска
///
/// # Поля
/// * `collection` - Имя коллекции для поиска
/// * `index_name` - Имя индекса для использования
/// * `value` - Значение для поиска в индексе (сериализованное)
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "index_name": "email_idx",
/// "value": "alice@example.com"
/// }
/// ```
QueryByIndex {
collection: String,
index_name: String,
value: Vec<u8>,
},
/// Добавление нового узла в кластер шардинга
/// Используется для горизонтального масштабирования системы
///
/// # Поля
/// * `node_id` - Уникальный идентификатор узла в кластере
/// * `address` - Сетевой адрес узла (например, "127.0.0.1:8081")
/// * `capacity` - Максимальная емкость узла в байтах
///
/// # Пример использования
/// ```json
/// {
/// "node_id": "node_1",
/// "address": "127.0.0.1:8081",
/// "capacity": 1073741824
/// }
/// ```
AddShardNode {
node_id: String,
address: String,
capacity: u64,
},
/// Удаление узла из кластера шардинга
/// Перед удалением рекомендуется мигрировать данные с узла
///
/// # Поля
/// * `node_id` - Идентификатор удаляемого узла
///
/// # Пример использования
/// ```json
/// {
/// "node_id": "node_1"
/// }
/// ```
RemoveShardNode {
node_id: String,
},
/// Миграция шарда (части данных) между узлами кластера
/// Используется для ребалансировки нагрузки и обслуживания узлов
///
/// # Поля
/// * `collection` - Имя коллекции, данные которой мигрируются
/// * `from_node` - Идентификатор исходного узла
/// * `to_node` - Идентификатор целевого узла
/// * `shard_key` - Ключ шарда для миграции (определяет диапазон данных)
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "from_node": "node_1",
/// "to_node": "node_2",
/// "shard_key": "user_id"
/// }
/// ```
MigrateShard {
collection: String,
from_node: String,
to_node: String,
shard_key: String,
},
/// Команда ребалансировки кластера
/// Автоматически распределяет данные между узлами для оптимальной нагрузки
/// Не требует параметров - работает на основе текущего состояния кластера
///
/// # Пример использования
/// ```json
/// "RebalanceCluster"
/// ```
RebalanceCluster,
/// Получение текущего статуса кластера
/// Возвращает информацию о всех узлах, их загрузке и состоянии
/// Не требует параметров
///
/// # Пример использования
/// ```json
/// "GetClusterStatus"
/// ```
GetClusterStatus,
/// Добавление ограничения (constraint) к коллекции
/// Ограничения обеспечивают целостность данных (уникальность, проверки и т.д.)
///
/// # Поля
/// * `collection` - Имя коллекции для добавления ограничения
/// * `constraint_name` - Уникальное имя ограничения
/// * `constraint_type` - Тип ограничения ("unique", "not_null", "check" и т.д.)
/// * `field` - Поле, к которому применяется ограничение
/// * `value` - Значение ограничения (зависит от типа)
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "constraint_name": "unique_email",
/// "constraint_type": "unique",
/// "field": "email",
/// "value": ""
/// }
/// ```
AddConstraint {
collection: String,
constraint_name: String,
constraint_type: String,
field: String,
value: Vec<u8>,
},
/// Удаление ограничения из коллекции
///
/// # Поля
/// * `collection` - Имя коллекции
/// * `constraint_name` - Имя удаляемого ограничения
///
/// # Пример использования
/// ```json
/// {
/// "collection": "users",
/// "constraint_name": "unique_email"
/// }
/// ```
RemoveConstraint {
collection: String,
constraint_name: String,
},
/// Включение сжатия данных для указанной коллекции
/// Уменьшает объем хранимых данных за счет CPU ресурсов
///
/// # Поля
/// * `collection` - Имя коллекции
/// * `algorithm` - Алгоритм сжатия ("gzip", "lz4", "zstd" и т.д.)
///
/// # Пример использования
/// ```json
/// {
/// "collection": "logs",
/// "algorithm": "gzip"
/// }
/// ```
EnableCompression {
collection: String,
algorithm: String,
},
/// Отключение сжатия данных для указанной коллекции
///
/// # Поля
/// * `collection` - Имя коллекции
///
/// # Пример использования
/// ```json
/// {
/// "collection": "logs"
/// }
/// ```
DisableCompression {
collection: String,
},
/// Создание глобального индекса (распределенного по всему кластеру)
/// Позволяет выполнять быстрый поиск по всем узлам кластера
///
/// # Поля
/// * `name` - Уникальное имя глобального индекса
/// * `field` - Поле для индексации
/// * `unique` - Флаг уникальности индекса
///
/// # Пример использования
/// ```json
/// {
/// "name": "global_user_email",
/// "field": "email",
/// "unique": true
/// }
/// ```
CreateGlobalIndex {
name: String,
field: String,
unique: bool,
},
/// Запрос по глобальному индексу
/// Выполняет поиск по всем узлам кластера с использованием глобального индекса
///
/// # Поля
/// * `index_name` - Имя глобального индекса
/// * `value` - Значение для поиска (сериализованное)
///
/// # Пример использования
/// ```json
/// {
/// "index_name": "global_user_email",
/// "value": "alice@example.com"
/// }
/// ```
QueryGlobalIndex {
index_name: String,
value: Vec<u8>,
},
}
/// Ответы от базы данных Futriix
/// Используются для возврата результатов выполнения команд клиентам.
/// Каждый ответ содержит либо успешный результат с данными, либо описание ошибки.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Response {
/// Успешное выполнение команды с возвращаемыми данными
///
/// # Параметры
/// * `Vec<u8>` - Сериализованные данные результата
/// Формат зависит от выполненной команды:
/// - Для Create: ID созданного документа (String)
/// - Для Read: документ в формате JSON
/// - Для Query: массив документов в формате JSON
/// - Для QueryByIndex: массив ID документов
///
/// # Пример
/// ```json
/// {"Success": "550e8400-e29b-41d4-a716-446655440000"}
/// ```
Success(Vec<u8>),
/// Ошибка выполнения команды с описанием проблемы
///
/// # Параметры
/// * `String` - Человеко-читаемое описание ошибки
///
/// # Пример
/// ```json
/// {"Error": "Document not found: 550e8400-e29b-41d4-a716-446655440000"}
/// ```
Error(String),
}
/// Сообщение для репликации команд между узлами кластера
/// Обеспечивает согласованное состояние данных на всех узлах.
/// Каждое сообщение имеет последовательный номер для гарантии порядка доставки.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationMessage {
/// Последовательный номер сообщения
/// Увеличивается монотонно для каждого нового сообщения
/// Используется для:
/// 1. Гарантии порядка применения команд на репликах
/// 2. Обнаружения пропущенных сообщений (gap detection)
/// 3. Идемпотентной обработки (дублирование сообщений с тем же sequence игнорируется)
pub sequence: u64,
/// Команда для репликации
/// Содержит операцию, которую нужно выполнить на всех репликах
/// Все команды из перечисления Command могут быть реплицированы
pub command: Command,
/// Временная метка создания сообщения
/// Используется для:
/// 1. Отслеживания задержек репликации
/// 2. Разрешения конфликтов (последняя запись побеждает)
/// 3. Аудита и отладки
/// Формат: Unix timestamp в секундах
pub timestamp: i64,
}
/// Структура для информации о шарде (части данных на узле)
/// Используется для мониторинга состояния распределенной системы
/// и принятия решений о балансировке нагрузки.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShardInfo {
/// Уникальный идентификатор узла в кластере
/// Формат: строка, обычно "node_" + уникальный суффикс
/// Пример: "node_1", "node_abc123"
pub node_id: String,
/// Сетевой адрес узла
/// Формат: "IP:PORT" или "hostname:PORT"
/// Пример: "127.0.0.1:8081", "db-node-1.example.com:8081"
pub address: String,
/// Общая емкость узла в байтах
/// Максимальный объем данных, который может хранить узел
/// Пример: 1073741824 (1GB), 5368709120 (5GB)
pub capacity: u64,
/// Используемая емкость узла в байтах
/// Текущий объем данных, хранящихся на узле
/// Всегда меньше или равен capacity
pub used: u64,
/// Список коллекций, хранящихся на узле
/// Каждый узел может хранить несколько коллекций или части коллекций
/// Пример: ["users", "orders", "logs"]
pub collections: Vec<String>,
}
/// Структура для статуса кластера
/// Содержит полную информацию о состоянии распределенной системы.
/// Используется для мониторинга, принятия решений о балансировке
/// и отображения состояния администраторам.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterStatus {
/// Список всех узлов в кластере с их текущим состоянием
/// Каждый элемент содержит подробную информацию об узле
pub nodes: Vec<ShardInfo>,
/// Общая емкость кластера в байтах
/// Сумма capacity всех узлов
pub total_capacity: u64,
/// Общая используемая емкость кластера в байтах
/// Сумма used всех узлов
pub total_used: u64,
/// Флаг необходимости ребалансировки кластера
/// true: распределение данных между узлами неравномерно
/// false: данные распределены оптимально
/// Определяется на основе разницы в использовании емкости между узлами
pub rebalance_needed: bool,
}
/// Wait-Free сериализация сообщений с использованием MessagePack
/// Преобразует структуры данных в компактный бинарный формат для передачи по сети.
///
/// # Типовые параметры
/// * `T` - Тип сериализуемого значения, должен реализовывать `serde::Serialize`
///
/// # Аргументы
/// * `value` - Ссылка на значение для сериализации
///
/// # Возвращаемое значение
/// * `crate::common::error::Result<Vec<u8>>` - Сериализованные данные или ошибка
///
/// # Пример использования
/// ```rust
/// let command = Command::Create {
/// collection: "users".to_string(),
/// document: b"{\"name\":\"Alice\"}".to_vec(),
/// };
///
/// let serialized = serialize(&command)?;
/// // Отправка serialized по сети...
/// ```
pub fn serialize<T: serde::Serialize>(value: &T) -> crate::common::error::Result<Vec<u8>> {
// Используем rmp_serde для сериализации в MessagePack
// MessagePack выбран из-за:
// 1. Компактности (меньше JSON)
// 2. Быстроты сериализации/десериализации
// 3. Поддержки бинарных данных без кодирования
rmp_serde::to_vec(value)
.map_err(|e| crate::common::error::FalcotError::SerializationError(e.to_string()))
}
/// Wait-Free десериализация сообщений с использованием MessagePack
/// Преобразует бинарные данные обратно в структуры Rust.
/// Обратная операция для функции `serialize`.
///
/// # Типовые параметры
/// * `T` - Тип десериализуемого значения, должен реализовывать `serde::Deserialize<'a>`
///
/// # Аргументы
/// * `bytes` - Ссылка на байты для десериализации
///
/// # Возвращаемое значение
/// * `crate::common::error::Result<T>` - Десериализованное значение или ошибка
///
/// # Пример использования
/// ```rust
/// // Получение bytes из сети...
/// let command: Command = deserialize(&bytes)?;
/// // Использование command...
/// ```
pub fn deserialize<'a, T: serde::Deserialize<'a>>(bytes: &'a [u8]) -> crate::common::error::Result<T> {
// Используем rmp_serde для десериализации из MessagePack
// Обрабатываем ошибки парсинга и несоответствия типов
rmp_serde::from_slice(bytes)
.map_err(|e| crate::common::error::FalcotError::SerializationError(e.to_string()))
}
// Дополнительные структуры для расширения функциональности (закомментированы для будущего использования)
/*
/// Структура для информации о репликационном лаге
/// Используется для мониторинга отставания реплик от мастера
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationLag {
pub node_id: String,
pub lag_seconds: u64,
pub last_applied_sequence: u64,
}
/// Структура для статистики выполнения команд
/// Используется для мониторинга производительности и выявления узких мест
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommandStats {
pub command_type: String,
pub count: u64,
pub avg_duration_ms: f64,
pub error_rate: f64,
}
/// Структура для информации о здоровье узла
/// Используется для проверки доступности и состояния узлов кластера
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthCheck {
pub node_id: String,
pub status: String, // "healthy", "degraded", "unhealthy"
pub last_check: i64,
pub details: String,
}
*/
// Примечания по использованию протокола:
//
// 1. Сериализация:
// - Все структуры автоматически сериализуются с помощью derive(Serialize)
// - Используется MessagePack для максимальной эффективности
// - Бинарный формат обеспечивает компактность и скорость
//
// 2. Обработка ошибок:
// - Все ошибки сериализации/десериализации преобразуются в FalcotError
// - Клиенты должны обрабатывать возможные ошибки сети и парсинга
//
// 3. Совместимость:
// - Добавление новых вариантов Command обратно совместимо
// - Удаление или изменение существующих вариантов требует миграции
// - Все изменения в структурах должны тестироваться на совместимость
//
// 4. Производительность:
// - MessagePack быстрее JSON для сериализации/десериализации
// - Использование Vec<u8> для документов избегает лишних копирований
// - Wait-free операции не блокируют потоки выполнения
//
// 5. Безопасность:
// - Все входящие данные должны валидироваться перед использованием
// - Ограничение максимального размера сообщений для предотвращения DoS
// - Аутентификация и авторизация на более высоком уровне