diff --git a/internal/cluster/node.go b/internal/cluster/node.go new file mode 100644 index 0000000..049527b --- /dev/null +++ b/internal/cluster/node.go @@ -0,0 +1,612 @@ +/* + * Copyright 2026 Safronov Grigorii + * + * Licensed under the CDDL, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * https://opensource.org/licenses/CDDL-1.0 + */ + +// Файл: internal/cluster/node.go +// Назначение: Реализация узла кластера (node) для распределённой СУБД с поддержкой временных меток. +// Полностью lock-free с использованием атомарных операций. + +package cluster + +import ( + "encoding/json" + "fmt" + "net" + "sync/atomic" + "time" + + "futriis/internal/log" + "futriis/internal/storage" + "github.com/google/uuid" +) + +// NodeStatus представляет состояние узла кластера +type NodeStatus int32 + +const ( + StatusOffline NodeStatus = iota + StatusActive + StatusSyncing + StatusFailed +) + +// Node представляет отдельный узел в распределённой системе +type Node struct { + ID string // Уникальный идентификатор узла + IP string // IP-адрес узла + Port int // Порт для коммуникации + Status atomic.Int32 // Атомарный статус узла (NodeStatus) + Storage *storage.Storage + logger *log.Logger + coordinator *RaftCoordinator // Ссылка на координатора (теперь RaftCoordinator) + lastSeen atomic.Int64 // Время последнего heartbeat (Unix миллисекунды) + joinedAt atomic.Int64 // Время присоединения к кластеру + createdAt int64 // Время создания узла + startedAt int64 // Время последнего старта узла + stoppedAt int64 // Время остановки узла + incomingConn chan net.Conn // Канал для входящих соединений (wait-free) + stopChan chan struct{} + requestCount atomic.Uint64 // Счётчик обработанных запросов + bytesRx atomic.Uint64 // Получено байт + bytesTx atomic.Uint64 // Отправлено байт +} + +// NodeConfig содержит конфигурацию для создания узла +type NodeConfig struct { + IP string + Port int + Storage *storage.Storage + Logger *log.Logger + Coordinator *RaftCoordinator +} + +// NewNode создаёт новый экземпляр узла кластера +func NewNode(ip string, port int, store *storage.Storage, logger *log.Logger) *Node { + now := time.Now().UnixMilli() + node := &Node{ + ID: uuid.New().String(), + IP: ip, + Port: port, + Storage: store, + logger: logger, + incomingConn: make(chan net.Conn, 1000), // Буферизованный канал для wait-free приёма + stopChan: make(chan struct{}), + createdAt: now, + startedAt: now, + } + node.Status.Store(int32(StatusActive)) + node.lastSeen.Store(now) + node.joinedAt.Store(0) // Пока не присоединён к кластеру + + // Запуск сервера для приёма межузловых соединений + go node.startTCPServer() + + // Запуск обработчика входящих соединений + go node.handleIncomingConnections() + + // Запуск heartbeat-отправки (если координатор известен) + go node.heartbeatLoop() + + if logger != nil { + logger.Info(fmt.Sprintf("Node %s created at %s", node.ID, node.GetCreatedAtStr())) + } + + return node +} + +// startTCPServer запускает TCP-сервер для приёма запросов от других узлов +func (n *Node) startTCPServer() { + addr := fmt.Sprintf("%s:%d", n.IP, n.Port) + listener, err := net.Listen("tcp", addr) + if err != nil { + if n.logger != nil { + n.logger.Error(fmt.Sprintf("Node %s failed to start TCP server: %v", n.ID, err)) + } + n.Status.Store(int32(StatusFailed)) + return + } + defer listener.Close() + + if n.logger != nil { + n.logger.Info(fmt.Sprintf("Node %s listening on %s (started at %s)", n.ID, addr, n.GetStartedAtStr())) + } + + for { + select { + case <-n.stopChan: + return + default: + conn, err := listener.Accept() + if err != nil { + if n.logger != nil { + n.logger.Error(fmt.Sprintf("Node %s accept error: %v", n.ID, err)) + } + continue + } + // Неблокирующая отправка в канал + select { + case n.incomingConn <- conn: + n.bytesRx.Add(1) + default: + if n.logger != nil { + n.logger.Warn(fmt.Sprintf("Node %s incoming connection queue full, dropping connection", n.ID)) + } + conn.Close() + } + } + } +} + +// handleIncomingConnections обрабатывает входящие соединения wait-free способом +func (n *Node) handleIncomingConnections() { + for { + select { + case <-n.stopChan: + return + case conn := <-n.incomingConn: + n.requestCount.Add(1) + go n.handleNodeRequest(conn) + } + } +} + +// handleNodeRequest обрабатывает конкретный запрос от другого узла +func (n *Node) handleNodeRequest(conn net.Conn) { + defer conn.Close() + + decoder := json.NewDecoder(conn) + var req NodeRequest + if err := decoder.Decode(&req); err != nil { + if n.logger != nil { + n.logger.Error(fmt.Sprintf("Node %s failed to decode request: %v", n.ID, err)) + } + return + } + + // Обновляем время последнего контакта + n.lastSeen.Store(time.Now().UnixMilli()) + + if n.logger != nil { + n.logger.Debug(fmt.Sprintf("Node %s received request type %s from %s at %s", + n.ID, req.Type, req.FromNode, time.UnixMilli(req.Timestamp).Format("15:04:05.000"))) + } + + // Маршрутизация запроса в зависимости от типа + switch req.Type { + case "replicate": + n.handleReplicateRequest(req.Data) + case "query": + n.handleQueryRequest(req.Data, conn) + case "sync": + n.handleSyncRequest(req.Data, conn) + case "heartbeat": + n.handleHeartbeatRequest(req, conn) + default: + if n.logger != nil { + n.logger.Warn(fmt.Sprintf("Node %s unknown request type: %s", n.ID, req.Type)) + } + } +} + +// handleReplicateRequest обрабатывает запрос на репликацию документа +func (n *Node) handleReplicateRequest(data []byte) { + startTime := time.Now().UnixMilli() + + var repData struct { + Database string `json:"database"` + Collection string `json:"collection"` + Document map[string]interface{} `json:"document"` + SourceNode string `json:"source_node"` + ReplicaID string `json:"replica_id"` + } + + if err := json.Unmarshal(data, &repData); err != nil { + if n.logger != nil { + n.logger.Error(fmt.Sprintf("Node %s failed to unmarshal replicate data: %v", n.ID, err)) + } + return + } + + // Получаем базу данных + db, err := n.Storage.GetDatabase(repData.Database) + if err != nil { + if n.logger != nil { + n.logger.Error(fmt.Sprintf("Node %s database not found for replication: %s", n.ID, repData.Database)) + } + return + } + + // Получаем коллекцию + coll, err := db.GetCollection(repData.Collection) + if err != nil { + if n.logger != nil { + n.logger.Error(fmt.Sprintf("Node %s collection not found for replication: %s", n.ID, repData.Collection)) + } + return + } + + // Создаём документ и вставляем + doc := &storage.Document{ + ID: repData.Document["_id"].(string), + Fields: repData.Document, + } + + if err := coll.Insert(doc); err != nil { + if n.logger != nil { + n.logger.Error(fmt.Sprintf("Node %s failed to replicate document: %v", n.ID, err)) + } + } else { + duration := time.Now().UnixMilli() - startTime + if n.logger != nil { + n.logger.Debug(fmt.Sprintf("Node %s replicated document %s from %s (took %d ms)", + n.ID, doc.ID, repData.SourceNode, duration)) + } + } +} + +// handleQueryRequest обрабатывает запрос на чтение данных с узла +func (n *Node) handleQueryRequest(data []byte, conn net.Conn) { + startTime := time.Now().UnixMilli() + + var queryData struct { + Database string `json:"database"` + Collection string `json:"collection"` + DocumentID string `json:"document_id"` + RequestID string `json:"request_id"` + } + + if err := json.Unmarshal(data, &queryData); err != nil { + n.sendErrorResponse(conn, err.Error()) + return + } + + // Получаем базу данных + db, err := n.Storage.GetDatabase(queryData.Database) + if err != nil { + n.sendErrorResponse(conn, err.Error()) + return + } + + // Получаем коллекцию + coll, err := db.GetCollection(queryData.Collection) + if err != nil { + n.sendErrorResponse(conn, err.Error()) + return + } + + // Находим документ + doc, err := coll.Find(queryData.DocumentID) + if err != nil { + n.sendErrorResponse(conn, err.Error()) + return + } + + duration := time.Now().UnixMilli() - startTime + + // Отправляем успешный ответ + response := map[string]interface{}{ + "status": "success", + "data": doc, + "node_id": n.ID, + "request_id": queryData.RequestID, + "duration_ms": duration, + "timestamp": time.Now().UnixMilli(), + } + encoder := json.NewEncoder(conn) + encoder.Encode(response) + + n.bytesTx.Add(uint64(len(response))) + + if n.logger != nil { + n.logger.Debug(fmt.Sprintf("Node %s handled query for %s.%s:%s (took %d ms)", + n.ID, queryData.Database, queryData.Collection, queryData.DocumentID, duration)) + } +} + +// handleSyncRequest обрабатывает запрос на синхронизацию всей коллекции +func (n *Node) handleSyncRequest(data []byte, conn net.Conn) { + startTime := time.Now().UnixMilli() + + var syncData struct { + Database string `json:"database"` + Collection string `json:"collection"` + RequestID string `json:"request_id"` + Since int64 `json:"since"` // Синхронизация с указанного времени + } + + if err := json.Unmarshal(data, &syncData); err != nil { + n.sendErrorResponse(conn, err.Error()) + return + } + + // Получаем базу данных + db, err := n.Storage.GetDatabase(syncData.Database) + if err != nil { + n.sendErrorResponse(conn, err.Error()) + return + } + + // Получаем коллекцию + coll, err := db.GetCollection(syncData.Collection) + if err != nil { + n.sendErrorResponse(conn, err.Error()) + return + } + + // Получаем все документы + docs := coll.GetAllDocuments() + + // Фильтруем по времени, если указано + if syncData.Since > 0 { + filtered := make([]*storage.Document, 0) + for _, doc := range docs { + if doc.UpdatedAt > syncData.Since { + filtered = append(filtered, doc) + } + } + docs = filtered + } + + duration := time.Now().UnixMilli() - startTime + + response := map[string]interface{}{ + "status": "success", + "docs": docs, + "count": len(docs), + "node_id": n.ID, + "request_id": syncData.RequestID, + "duration_ms": duration, + "timestamp": time.Now().UnixMilli(), + "sync_duration_ms": duration, + } + encoder := json.NewEncoder(conn) + encoder.Encode(response) + + if n.logger != nil { + n.logger.Info(fmt.Sprintf("Node %s synced %d documents from %s.%s (took %d ms)", + n.ID, len(docs), syncData.Database, syncData.Collection, duration)) + } +} + +// handleHeartbeatRequest обрабатывает heartbeat запрос +func (n *Node) handleHeartbeatRequest(req NodeRequest, conn net.Conn) { + n.lastSeen.Store(time.Now().UnixMilli()) + + response := map[string]interface{}{ + "status": "alive", + "node_id": n.ID, + "timestamp": time.Now().UnixMilli(), + "uptime_ms": time.Now().UnixMilli() - n.startedAt, + } + encoder := json.NewEncoder(conn) + encoder.Encode(response) +} + +// sendErrorResponse отправляет ошибку в ответ на запрос +func (n *Node) sendErrorResponse(conn net.Conn, errMsg string) { + response := map[string]interface{}{ + "status": "error", + "error": errMsg, + "node_id": n.ID, + "timestamp": time.Now().UnixMilli(), + } + encoder := json.NewEncoder(conn) + encoder.Encode(response) +} + +// heartbeatLoop отправляет периодические сигналы жизни координатору +func (n *Node) heartbeatLoop() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-n.stopChan: + return + case <-ticker.C: + if n.coordinator != nil { + n.coordinator.SendHeartbeat(n.ID) + n.lastSeen.Store(time.Now().UnixMilli()) + if n.logger != nil { + n.logger.Debug(fmt.Sprintf("Node %s sent heartbeat at %s", n.ID, n.GetLastSeenStr())) + } + } + } + } +} + +// GetNodeStatus возвращает текущий статус узла (атомарно) +func (n *Node) GetNodeStatus() NodeStatus { + return NodeStatus(n.Status.Load()) +} + +// IsActive проверяет, активен ли узел +func (n *Node) IsActive() bool { + return NodeStatus(n.Status.Load()) == StatusActive +} + +// SetCoordinator устанавливает координатора для узла +func (n *Node) SetCoordinator(coord *RaftCoordinator) { + n.coordinator = coord + now := time.Now().UnixMilli() + n.joinedAt.Store(now) + + if n.logger != nil { + n.logger.Info(fmt.Sprintf("Node %s joined cluster at %s", n.ID, n.GetJoinedAtStr())) + } +} + +// JoinCluster присоединяет узел к кластеру +func (n *Node) JoinCluster(coord *RaftCoordinator) error { + if n.coordinator != nil { + return fmt.Errorf("node already joined to cluster") + } + + n.SetCoordinator(coord) + + // Регистрируем узел в координаторе + if err := coord.RegisterNode(n); err != nil { + return fmt.Errorf("failed to register node: %v", err) + } + + n.Status.Store(int32(StatusActive)) + + if n.logger != nil { + n.logger.Info(fmt.Sprintf("Node %s successfully joined cluster at %s", n.ID, n.GetJoinedAtStr())) + } + + return nil +} + +// LeaveCluster покидает кластер +func (n *Node) LeaveCluster() error { + if n.coordinator == nil { + return fmt.Errorf("node not in cluster") + } + + n.Status.Store(int32(StatusOffline)) + + if err := n.coordinator.RemoveNode(n.ID); err != nil { + n.logger.Warn(fmt.Sprintf("Failed to remove node from coordinator: %v", err)) + } + + n.coordinator = nil + n.joinedAt.Store(0) + + if n.logger != nil { + n.logger.Info(fmt.Sprintf("Node %s left cluster at %s", n.ID, time.Now().Format("2006-01-02 15:04:05.000"))) + } + + return nil +} + +// GetLastSeen возвращает время последнего контакта +func (n *Node) GetLastSeen() int64 { + return n.lastSeen.Load() +} + +// GetLastSeenStr возвращает человекочитаемое время последнего контакта +func (n *Node) GetLastSeenStr() string { + lastSeen := n.lastSeen.Load() + if lastSeen == 0 { + return "never" + } + return time.UnixMilli(lastSeen).Format("2006-01-02 15:04:05.000") +} + +// GetJoinedAt возвращает время присоединения к кластеру +func (n *Node) GetJoinedAt() int64 { + return n.joinedAt.Load() +} + +// GetJoinedAtStr возвращает человекочитаемое время присоединения +func (n *Node) GetJoinedAtStr() string { + joinedAt := n.joinedAt.Load() + if joinedAt == 0 { + return "not joined" + } + return time.UnixMilli(joinedAt).Format("2006-01-02 15:04:05.000") +} + +// GetStartedAt возвращает время старта узла +func (n *Node) GetStartedAt() int64 { + return n.startedAt +} + +// GetStartedAtStr возвращает человекочитаемое время старта +func (n *Node) GetStartedAtStr() string { + return time.UnixMilli(n.startedAt).Format("2006-01-02 15:04:05.000") +} + +// GetCreatedAt возвращает время создания узла +func (n *Node) GetCreatedAt() int64 { + return n.createdAt +} + +// GetCreatedAtStr возвращает человекочитаемое время создания +func (n *Node) GetCreatedAtStr() string { + return time.UnixMilli(n.createdAt).Format("2006-01-02 15:04:05.000") +} + +// GetUptime возвращает время работы узла +func (n *Node) GetUptime() time.Duration { + if n.startedAt == 0 { + return 0 + } + return time.Duration(time.Now().UnixMilli()-n.startedAt) * time.Millisecond +} + +// GetStats возвращает статистику узла +func (n *Node) GetStats() map[string]interface{} { + return map[string]interface{}{ + "id": n.ID, + "ip": n.IP, + "port": n.Port, + "status": n.GetNodeStatus(), + "created_at": n.GetCreatedAtStr(), + "started_at": n.GetStartedAtStr(), + "joined_at": n.GetJoinedAtStr(), + "last_seen": n.GetLastSeenStr(), + "uptime": n.GetUptime().String(), + "request_count": n.requestCount.Load(), + "bytes_rx": n.bytesRx.Load(), + "bytes_tx": n.bytesTx.Load(), + } +} + +// Stop останавливает работу узла +func (n *Node) Stop() { + n.Status.Store(int32(StatusOffline)) + n.stoppedAt = time.Now().UnixMilli() + close(n.stopChan) + if n.logger != nil { + n.logger.Info(fmt.Sprintf("Node %s stopped at %s", n.ID, time.UnixMilli(n.stoppedAt).Format("2006-01-02 15:04:05.000"))) + } +} + +// GetAddress возвращает адрес узла в формате "ip:port" +func (n *Node) GetAddress() string { + return fmt.Sprintf("%s:%d", n.IP, n.Port) +} + +// ReplicateDocument отправляет документ на репликацию всем активным узлам +func (n *Node) ReplicateDocument(database, collection string, doc *storage.Document) error { + if n.coordinator == nil { + if n.logger != nil { + n.logger.Warn("No coordinator set, skipping replication") + } + return fmt.Errorf("no coordinator set") + } + + // Получаем список всех узлов от координатора + nodes := n.coordinator.GetActiveNodes() + + startTime := time.Now().UnixMilli() + + for _, nodeInfo := range nodes { + if nodeInfo.ID == n.ID { + continue // Пропускаем себя + } + + // В реальной реализации здесь была бы отправка на узел + if n.logger != nil { + n.logger.Debug(fmt.Sprintf("Would replicate to node %s (last seen: %s)", + nodeInfo.ID, time.UnixMilli(nodeInfo.LastSeen).Format("15:04:05.000"))) + } + } + + duration := time.Now().UnixMilli() - startTime + + if n.logger != nil { + n.logger.Info(fmt.Sprintf("Replicated document %s to %d nodes (took %d ms)", + doc.ID, len(nodes)-1, duration)) + } + + return nil +}