// Файл: internal/cluster/node.go // Назначение: Реализация узла кластера (node) для распределённой СУБД. // Реализация узла кластера (node) для распределённой СУБД. // Полностью lock-free с использованием атомарных операций. package cluster import ( "encoding/json" "fmt" "net" "sync/atomic" "time" "futriis/internal/log" "futriis/internal/storage" "github.com/google/uuid" ) // NodeStatus представляет состояние узла кластера type NodeStatus int32 const ( StatusOffline NodeStatus = iota StatusActive StatusSyncing StatusFailed ) // Node представляет отдельный узел в распределённой системе type Node struct { ID string // Уникальный идентификатор узла IP string // IP-адрес узла Port int // Порт для коммуникации Status atomic.Int32 // Атомарный статус узла (NodeStatus) Storage *storage.Storage logger *log.Logger coordinator *RaftCoordinator // Ссылка на координатора (теперь RaftCoordinator) lastSeen atomic.Int64 // Время последнего heartbeat (Unix nano) incomingConn chan net.Conn // Канал для входящих соединений (wait-free) stopChan chan struct{} } // NodeConfig содержит конфигурацию для создания узла type NodeConfig struct { IP string Port int Storage *storage.Storage Logger *log.Logger Coordinator *RaftCoordinator } // NewNode создаёт новый экземпляр узла кластера func NewNode(ip string, port int, store *storage.Storage, logger *log.Logger) *Node { node := &Node{ ID: uuid.New().String(), IP: ip, Port: port, Storage: store, logger: logger, incomingConn: make(chan net.Conn, 1000), // Буферизованный канал для wait-free приёма stopChan: make(chan struct{}), } node.Status.Store(int32(StatusActive)) node.lastSeen.Store(time.Now().UnixNano()) // Запуск сервера для приёма межузловых соединений go node.startTCPServer() // Запуск обработчика входящих соединений go node.handleIncomingConnections() // Запуск heartbeat-отправки (если координатор известен) go node.heartbeatLoop() return node } // startTCPServer запускает TCP-сервер для приёма запросов от других узлов func (n *Node) startTCPServer() { addr := fmt.Sprintf("%s:%d", n.IP, n.Port) listener, err := net.Listen("tcp", addr) if err != nil { if n.logger != nil { n.logger.Error(fmt.Sprintf("Node %s failed to start TCP server: %v", n.ID, err)) } n.Status.Store(int32(StatusFailed)) return } defer listener.Close() if n.logger != nil { n.logger.Info(fmt.Sprintf("Node %s listening on %s", n.ID, addr)) } for { select { case <-n.stopChan: return default: conn, err := listener.Accept() if err != nil { if n.logger != nil { n.logger.Error(fmt.Sprintf("Node %s accept error: %v", n.ID, err)) } continue } // Неблокирующая отправка в канал select { case n.incomingConn <- conn: default: if n.logger != nil { n.logger.Warn(fmt.Sprintf("Node %s incoming connection queue full, dropping connection", n.ID)) } conn.Close() } } } } // handleIncomingConnections обрабатывает входящие соединения wait-free способом func (n *Node) handleIncomingConnections() { for { select { case <-n.stopChan: return case conn := <-n.incomingConn: go n.handleNodeRequest(conn) } } } // handleNodeRequest обрабатывает конкретный запрос от другого узла func (n *Node) handleNodeRequest(conn net.Conn) { defer conn.Close() decoder := json.NewDecoder(conn) var req NodeRequest if err := decoder.Decode(&req); err != nil { if n.logger != nil { n.logger.Error(fmt.Sprintf("Node %s failed to decode request: %v", n.ID, err)) } return } // Обновляем время последнего контакта n.lastSeen.Store(time.Now().UnixNano()) // Маршрутизация запроса в зависимости от типа switch req.Type { case "replicate": n.handleReplicateRequest(req.Data) case "query": n.handleQueryRequest(req.Data, conn) case "sync": n.handleSyncRequest(req.Data, conn) default: if n.logger != nil { n.logger.Warn(fmt.Sprintf("Node %s unknown request type: %s", n.ID, req.Type)) } } } // handleReplicateRequest обрабатывает запрос на репликацию документа func (n *Node) handleReplicateRequest(data []byte) { var repData struct { Database string `json:"database"` Collection string `json:"collection"` Document map[string]interface{} `json:"document"` } if err := json.Unmarshal(data, &repData); err != nil { if n.logger != nil { n.logger.Error(fmt.Sprintf("Node %s failed to unmarshal replicate data: %v", n.ID, err)) } return } // Получаем базу данных db, err := n.Storage.GetDatabase(repData.Database) if err != nil { if n.logger != nil { n.logger.Error(fmt.Sprintf("Node %s database not found for replication: %s", n.ID, repData.Database)) } return } // Получаем коллекцию coll, err := db.GetCollection(repData.Collection) if err != nil { if n.logger != nil { n.logger.Error(fmt.Sprintf("Node %s collection not found for replication: %s", n.ID, repData.Collection)) } return } // Создаём документ и вставляем doc := &storage.Document{ ID: repData.Document["_id"].(string), Fields: repData.Document, } if err := coll.Insert(doc); err != nil { if n.logger != nil { n.logger.Error(fmt.Sprintf("Node %s failed to replicate document: %v", n.ID, err)) } } else { if n.logger != nil { n.logger.Debug(fmt.Sprintf("Node %s replicated document %s", n.ID, doc.ID)) } } } // handleQueryRequest обрабатывает запрос на чтение данных с узла func (n *Node) handleQueryRequest(data []byte, conn net.Conn) { var queryData struct { Database string `json:"database"` Collection string `json:"collection"` DocumentID string `json:"document_id"` } if err := json.Unmarshal(data, &queryData); err != nil { n.sendErrorResponse(conn, err.Error()) return } // Получаем базу данных db, err := n.Storage.GetDatabase(queryData.Database) if err != nil { n.sendErrorResponse(conn, err.Error()) return } // Получаем коллекцию coll, err := db.GetCollection(queryData.Collection) if err != nil { n.sendErrorResponse(conn, err.Error()) return } // Находим документ doc, err := coll.Find(queryData.DocumentID) if err != nil { n.sendErrorResponse(conn, err.Error()) return } // Отправляем успешный ответ response := map[string]interface{}{ "status": "success", "data": doc, } encoder := json.NewEncoder(conn) encoder.Encode(response) } // handleSyncRequest обрабатывает запрос на синхронизацию всей коллекции func (n *Node) handleSyncRequest(data []byte, conn net.Conn) { var syncData struct { Database string `json:"database"` Collection string `json:"collection"` } if err := json.Unmarshal(data, &syncData); err != nil { n.sendErrorResponse(conn, err.Error()) return } // Получаем базу данных db, err := n.Storage.GetDatabase(syncData.Database) if err != nil { n.sendErrorResponse(conn, err.Error()) return } // Получаем коллекцию coll, err := db.GetCollection(syncData.Collection) if err != nil { n.sendErrorResponse(conn, err.Error()) return } // Получаем все документы docs := coll.GetAllDocuments() response := map[string]interface{}{ "status": "success", "docs": docs, "count": len(docs), } encoder := json.NewEncoder(conn) encoder.Encode(response) } // sendErrorResponse отправляет ошибку в ответ на запрос func (n *Node) sendErrorResponse(conn net.Conn, errMsg string) { response := map[string]interface{}{ "status": "error", "error": errMsg, } encoder := json.NewEncoder(conn) encoder.Encode(response) } // heartbeatLoop отправляет периодические сигналы жизни координатору func (n *Node) heartbeatLoop() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-n.stopChan: return case <-ticker.C: if n.coordinator != nil { n.coordinator.SendHeartbeat(n.ID) n.lastSeen.Store(time.Now().UnixNano()) } } } } // GetNodeStatus возвращает текущий статус узла (атомарно) func (n *Node) GetNodeStatus() NodeStatus { return NodeStatus(n.Status.Load()) } // IsActive проверяет, активен ли узел func (n *Node) IsActive() bool { return NodeStatus(n.Status.Load()) == StatusActive } // SetCoordinator устанавливает координатора для узла func (n *Node) SetCoordinator(coord *RaftCoordinator) { n.coordinator = coord if n.logger != nil { n.logger.Info(fmt.Sprintf("Node %s connected to coordinator", n.ID)) } } // Stop останавливает работу узла func (n *Node) Stop() { n.Status.Store(int32(StatusOffline)) close(n.stopChan) if n.logger != nil { n.logger.Info(fmt.Sprintf("Node %s stopped", n.ID)) } } // GetAddress возвращает адрес узла в формате "ip:port" func (n *Node) GetAddress() string { return fmt.Sprintf("%s:%d", n.IP, n.Port) } // ReplicateDocument отправляет документ на репликацию всем активным узлам func (n *Node) ReplicateDocument(database, collection string, doc *storage.Document) error { if n.coordinator == nil { if n.logger != nil { n.logger.Warn("No coordinator set, skipping replication") } return fmt.Errorf("no coordinator set") } // Получаем список всех узлов от координатора nodes := n.coordinator.GetActiveNodes() for _, nodeInfo := range nodes { if nodeInfo.ID == n.ID { continue // Пропускаем себя } // В реальной реализации здесь была бы отправка на узел if n.logger != nil { n.logger.Debug(fmt.Sprintf("Would replicate to node %s", nodeInfo.ID)) } } return nil }