/* * Copyright 2026 Safronov Grigorii * * Licensed under the CDDL, Version 1.0 (the "License"); * you may not use this file except in compliance with the License. * * You may obtain a copy of the License at * https://opensource.org/licenses/CDDL-1.0 */ // Файл: internal/cluster/node.go // Назначение: Реализация узла кластера (node) для распределённой СУБД с поддержкой временных меток. // Полностью lock-free с использованием атомарных операций. package cluster import ( "encoding/json" "fmt" "net" "sync/atomic" "time" "futriis/internal/log" "futriis/internal/storage" "github.com/google/uuid" ) // NodeStatus представляет состояние узла кластера type NodeStatus int32 const ( StatusOffline NodeStatus = iota StatusActive StatusSyncing StatusFailed ) // Node представляет отдельный узел в распределённой системе type Node struct { ID string // Уникальный идентификатор узла IP string // IP-адрес узла Port int // Порт для коммуникации Status atomic.Int32 // Атомарный статус узла (NodeStatus) Storage *storage.Storage logger *log.Logger coordinator *RaftCoordinator // Ссылка на координатора (теперь RaftCoordinator) lastSeen atomic.Int64 // Время последнего heartbeat (Unix миллисекунды) joinedAt atomic.Int64 // Время присоединения к кластеру createdAt int64 // Время создания узла startedAt int64 // Время последнего старта узла stoppedAt int64 // Время остановки узла incomingConn chan net.Conn // Канал для входящих соединений (wait-free) stopChan chan struct{} requestCount atomic.Uint64 // Счётчик обработанных запросов bytesRx atomic.Uint64 // Получено байт bytesTx atomic.Uint64 // Отправлено байт } // NodeConfig содержит конфигурацию для создания узла type NodeConfig struct { IP string Port int Storage *storage.Storage Logger *log.Logger Coordinator *RaftCoordinator } // NewNode создаёт новый экземпляр узла кластера func NewNode(ip string, port int, store *storage.Storage, logger *log.Logger) *Node { now := time.Now().UnixMilli() node := &Node{ ID: uuid.New().String(), IP: ip, Port: port, Storage: store, logger: logger, incomingConn: make(chan net.Conn, 1000), // Буферизованный канал для wait-free приёма stopChan: make(chan struct{}), createdAt: now, startedAt: now, } node.Status.Store(int32(StatusActive)) node.lastSeen.Store(now) node.joinedAt.Store(0) // Пока не присоединён к кластеру // Запуск сервера для приёма межузловых соединений go node.startTCPServer() // Запуск обработчика входящих соединений go node.handleIncomingConnections() // Запуск heartbeat-отправки (если координатор известен) go node.heartbeatLoop() if logger != nil { logger.Info(fmt.Sprintf("Node %s created at %s", node.ID, node.GetCreatedAtStr())) } return node } // startTCPServer запускает TCP-сервер для приёма запросов от других узлов func (n *Node) startTCPServer() { addr := fmt.Sprintf("%s:%d", n.IP, n.Port) listener, err := net.Listen("tcp", addr) if err != nil { if n.logger != nil { n.logger.Error(fmt.Sprintf("Node %s failed to start TCP server: %v", n.ID, err)) } n.Status.Store(int32(StatusFailed)) return } defer listener.Close() if n.logger != nil { n.logger.Info(fmt.Sprintf("Node %s listening on %s (started at %s)", n.ID, addr, n.GetStartedAtStr())) } for { select { case <-n.stopChan: return default: conn, err := listener.Accept() if err != nil { if n.logger != nil { n.logger.Error(fmt.Sprintf("Node %s accept error: %v", n.ID, err)) } continue } // Неблокирующая отправка в канал select { case n.incomingConn <- conn: n.bytesRx.Add(1) default: if n.logger != nil { n.logger.Warn(fmt.Sprintf("Node %s incoming connection queue full, dropping connection", n.ID)) } conn.Close() } } } } // handleIncomingConnections обрабатывает входящие соединения wait-free способом func (n *Node) handleIncomingConnections() { for { select { case <-n.stopChan: return case conn := <-n.incomingConn: n.requestCount.Add(1) go n.handleNodeRequest(conn) } } } // handleNodeRequest обрабатывает конкретный запрос от другого узла func (n *Node) handleNodeRequest(conn net.Conn) { defer conn.Close() decoder := json.NewDecoder(conn) var req NodeRequest if err := decoder.Decode(&req); err != nil { if n.logger != nil { n.logger.Error(fmt.Sprintf("Node %s failed to decode request: %v", n.ID, err)) } return } // Обновляем время последнего контакта n.lastSeen.Store(time.Now().UnixMilli()) if n.logger != nil { n.logger.Debug(fmt.Sprintf("Node %s received request type %s from %s at %s", n.ID, req.Type, req.FromNode, time.UnixMilli(req.Timestamp).Format("15:04:05.000"))) } // Маршрутизация запроса в зависимости от типа switch req.Type { case "replicate": n.handleReplicateRequest(req.Data) case "query": n.handleQueryRequest(req.Data, conn) case "sync": n.handleSyncRequest(req.Data, conn) case "heartbeat": n.handleHeartbeatRequest(req, conn) default: if n.logger != nil { n.logger.Warn(fmt.Sprintf("Node %s unknown request type: %s", n.ID, req.Type)) } } } // handleReplicateRequest обрабатывает запрос на репликацию документа func (n *Node) handleReplicateRequest(data []byte) { startTime := time.Now().UnixMilli() var repData struct { Database string `json:"database"` Collection string `json:"collection"` Document map[string]interface{} `json:"document"` SourceNode string `json:"source_node"` ReplicaID string `json:"replica_id"` } if err := json.Unmarshal(data, &repData); err != nil { if n.logger != nil { n.logger.Error(fmt.Sprintf("Node %s failed to unmarshal replicate data: %v", n.ID, err)) } return } // Получаем базу данных db, err := n.Storage.GetDatabase(repData.Database) if err != nil { if n.logger != nil { n.logger.Error(fmt.Sprintf("Node %s database not found for replication: %s", n.ID, repData.Database)) } return } // Получаем коллекцию coll, err := db.GetCollection(repData.Collection) if err != nil { if n.logger != nil { n.logger.Error(fmt.Sprintf("Node %s collection not found for replication: %s", n.ID, repData.Collection)) } return } // Создаём документ и вставляем doc := &storage.Document{ ID: repData.Document["_id"].(string), Fields: repData.Document, } if err := coll.Insert(doc); err != nil { if n.logger != nil { n.logger.Error(fmt.Sprintf("Node %s failed to replicate document: %v", n.ID, err)) } } else { duration := time.Now().UnixMilli() - startTime if n.logger != nil { n.logger.Debug(fmt.Sprintf("Node %s replicated document %s from %s (took %d ms)", n.ID, doc.ID, repData.SourceNode, duration)) } } } // handleQueryRequest обрабатывает запрос на чтение данных с узла func (n *Node) handleQueryRequest(data []byte, conn net.Conn) { startTime := time.Now().UnixMilli() var queryData struct { Database string `json:"database"` Collection string `json:"collection"` DocumentID string `json:"document_id"` RequestID string `json:"request_id"` } if err := json.Unmarshal(data, &queryData); err != nil { n.sendErrorResponse(conn, err.Error()) return } // Получаем базу данных db, err := n.Storage.GetDatabase(queryData.Database) if err != nil { n.sendErrorResponse(conn, err.Error()) return } // Получаем коллекцию coll, err := db.GetCollection(queryData.Collection) if err != nil { n.sendErrorResponse(conn, err.Error()) return } // Находим документ doc, err := coll.Find(queryData.DocumentID) if err != nil { n.sendErrorResponse(conn, err.Error()) return } duration := time.Now().UnixMilli() - startTime // Отправляем успешный ответ response := map[string]interface{}{ "status": "success", "data": doc, "node_id": n.ID, "request_id": queryData.RequestID, "duration_ms": duration, "timestamp": time.Now().UnixMilli(), } encoder := json.NewEncoder(conn) encoder.Encode(response) n.bytesTx.Add(uint64(len(response))) if n.logger != nil { n.logger.Debug(fmt.Sprintf("Node %s handled query for %s.%s:%s (took %d ms)", n.ID, queryData.Database, queryData.Collection, queryData.DocumentID, duration)) } } // handleSyncRequest обрабатывает запрос на синхронизацию всей коллекции func (n *Node) handleSyncRequest(data []byte, conn net.Conn) { startTime := time.Now().UnixMilli() var syncData struct { Database string `json:"database"` Collection string `json:"collection"` RequestID string `json:"request_id"` Since int64 `json:"since"` // Синхронизация с указанного времени } if err := json.Unmarshal(data, &syncData); err != nil { n.sendErrorResponse(conn, err.Error()) return } // Получаем базу данных db, err := n.Storage.GetDatabase(syncData.Database) if err != nil { n.sendErrorResponse(conn, err.Error()) return } // Получаем коллекцию coll, err := db.GetCollection(syncData.Collection) if err != nil { n.sendErrorResponse(conn, err.Error()) return } // Получаем все документы docs := coll.GetAllDocuments() // Фильтруем по времени, если указано if syncData.Since > 0 { filtered := make([]*storage.Document, 0) for _, doc := range docs { if doc.UpdatedAt > syncData.Since { filtered = append(filtered, doc) } } docs = filtered } duration := time.Now().UnixMilli() - startTime response := map[string]interface{}{ "status": "success", "docs": docs, "count": len(docs), "node_id": n.ID, "request_id": syncData.RequestID, "duration_ms": duration, "timestamp": time.Now().UnixMilli(), "sync_duration_ms": duration, } encoder := json.NewEncoder(conn) encoder.Encode(response) if n.logger != nil { n.logger.Info(fmt.Sprintf("Node %s synced %d documents from %s.%s (took %d ms)", n.ID, len(docs), syncData.Database, syncData.Collection, duration)) } } // handleHeartbeatRequest обрабатывает heartbeat запрос func (n *Node) handleHeartbeatRequest(req NodeRequest, conn net.Conn) { n.lastSeen.Store(time.Now().UnixMilli()) response := map[string]interface{}{ "status": "alive", "node_id": n.ID, "timestamp": time.Now().UnixMilli(), "uptime_ms": time.Now().UnixMilli() - n.startedAt, } encoder := json.NewEncoder(conn) encoder.Encode(response) } // sendErrorResponse отправляет ошибку в ответ на запрос func (n *Node) sendErrorResponse(conn net.Conn, errMsg string) { response := map[string]interface{}{ "status": "error", "error": errMsg, "node_id": n.ID, "timestamp": time.Now().UnixMilli(), } encoder := json.NewEncoder(conn) encoder.Encode(response) } // heartbeatLoop отправляет периодические сигналы жизни координатору func (n *Node) heartbeatLoop() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-n.stopChan: return case <-ticker.C: if n.coordinator != nil { n.coordinator.SendHeartbeat(n.ID) n.lastSeen.Store(time.Now().UnixMilli()) if n.logger != nil { n.logger.Debug(fmt.Sprintf("Node %s sent heartbeat at %s", n.ID, n.GetLastSeenStr())) } } } } } // GetNodeStatus возвращает текущий статус узла (атомарно) func (n *Node) GetNodeStatus() NodeStatus { return NodeStatus(n.Status.Load()) } // IsActive проверяет, активен ли узел func (n *Node) IsActive() bool { return NodeStatus(n.Status.Load()) == StatusActive } // SetCoordinator устанавливает координатора для узла func (n *Node) SetCoordinator(coord *RaftCoordinator) { n.coordinator = coord now := time.Now().UnixMilli() n.joinedAt.Store(now) if n.logger != nil { n.logger.Info(fmt.Sprintf("Node %s joined cluster at %s", n.ID, n.GetJoinedAtStr())) } } // JoinCluster присоединяет узел к кластеру func (n *Node) JoinCluster(coord *RaftCoordinator) error { if n.coordinator != nil { return fmt.Errorf("node already joined to cluster") } n.SetCoordinator(coord) // Регистрируем узел в координаторе if err := coord.RegisterNode(n); err != nil { return fmt.Errorf("failed to register node: %v", err) } n.Status.Store(int32(StatusActive)) if n.logger != nil { n.logger.Info(fmt.Sprintf("Node %s successfully joined cluster at %s", n.ID, n.GetJoinedAtStr())) } return nil } // LeaveCluster покидает кластер func (n *Node) LeaveCluster() error { if n.coordinator == nil { return fmt.Errorf("node not in cluster") } n.Status.Store(int32(StatusOffline)) if err := n.coordinator.RemoveNode(n.ID); err != nil { n.logger.Warn(fmt.Sprintf("Failed to remove node from coordinator: %v", err)) } n.coordinator = nil n.joinedAt.Store(0) if n.logger != nil { n.logger.Info(fmt.Sprintf("Node %s left cluster at %s", n.ID, time.Now().Format("2006-01-02 15:04:05.000"))) } return nil } // GetLastSeen возвращает время последнего контакта func (n *Node) GetLastSeen() int64 { return n.lastSeen.Load() } // GetLastSeenStr возвращает человекочитаемое время последнего контакта func (n *Node) GetLastSeenStr() string { lastSeen := n.lastSeen.Load() if lastSeen == 0 { return "never" } return time.UnixMilli(lastSeen).Format("2006-01-02 15:04:05.000") } // GetJoinedAt возвращает время присоединения к кластеру func (n *Node) GetJoinedAt() int64 { return n.joinedAt.Load() } // GetJoinedAtStr возвращает человекочитаемое время присоединения func (n *Node) GetJoinedAtStr() string { joinedAt := n.joinedAt.Load() if joinedAt == 0 { return "not joined" } return time.UnixMilli(joinedAt).Format("2006-01-02 15:04:05.000") } // GetStartedAt возвращает время старта узла func (n *Node) GetStartedAt() int64 { return n.startedAt } // GetStartedAtStr возвращает человекочитаемое время старта func (n *Node) GetStartedAtStr() string { return time.UnixMilli(n.startedAt).Format("2006-01-02 15:04:05.000") } // GetCreatedAt возвращает время создания узла func (n *Node) GetCreatedAt() int64 { return n.createdAt } // GetCreatedAtStr возвращает человекочитаемое время создания func (n *Node) GetCreatedAtStr() string { return time.UnixMilli(n.createdAt).Format("2006-01-02 15:04:05.000") } // GetUptime возвращает время работы узла func (n *Node) GetUptime() time.Duration { if n.startedAt == 0 { return 0 } return time.Duration(time.Now().UnixMilli()-n.startedAt) * time.Millisecond } // GetStats возвращает статистику узла func (n *Node) GetStats() map[string]interface{} { return map[string]interface{}{ "id": n.ID, "ip": n.IP, "port": n.Port, "status": n.GetNodeStatus(), "created_at": n.GetCreatedAtStr(), "started_at": n.GetStartedAtStr(), "joined_at": n.GetJoinedAtStr(), "last_seen": n.GetLastSeenStr(), "uptime": n.GetUptime().String(), "request_count": n.requestCount.Load(), "bytes_rx": n.bytesRx.Load(), "bytes_tx": n.bytesTx.Load(), } } // Stop останавливает работу узла func (n *Node) Stop() { n.Status.Store(int32(StatusOffline)) n.stoppedAt = time.Now().UnixMilli() close(n.stopChan) if n.logger != nil { n.logger.Info(fmt.Sprintf("Node %s stopped at %s", n.ID, time.UnixMilli(n.stoppedAt).Format("2006-01-02 15:04:05.000"))) } } // GetAddress возвращает адрес узла в формате "ip:port" func (n *Node) GetAddress() string { return fmt.Sprintf("%s:%d", n.IP, n.Port) } // ReplicateDocument отправляет документ на репликацию всем активным узлам func (n *Node) ReplicateDocument(database, collection string, doc *storage.Document) error { if n.coordinator == nil { if n.logger != nil { n.logger.Warn("No coordinator set, skipping replication") } return fmt.Errorf("no coordinator set") } // Получаем список всех узлов от координатора nodes := n.coordinator.GetActiveNodes() startTime := time.Now().UnixMilli() for _, nodeInfo := range nodes { if nodeInfo.ID == n.ID { continue // Пропускаем себя } // В реальной реализации здесь была бы отправка на узел if n.logger != nil { n.logger.Debug(fmt.Sprintf("Would replicate to node %s (last seen: %s)", nodeInfo.ID, time.UnixMilli(nodeInfo.LastSeen).Format("15:04:05.000"))) } } duration := time.Now().UnixMilli() - startTime if n.logger != nil { n.logger.Info(fmt.Sprintf("Replicated document %s to %d nodes (took %d ms)", doc.ID, len(nodes)-1, duration)) } return nil }