diff --git a/internal/cluster/node.go b/internal/cluster/node.go deleted file mode 100644 index b006c71..0000000 --- a/internal/cluster/node.go +++ /dev/null @@ -1,389 +0,0 @@ -/* - * Copyright 2026 Safronov Grigorii - * - * Licensed under the CDDL, Version 1.0 (the "License"); - * you may not use this file except in compliance with the License. - * - * You may obtain a copy of the License at - * https://opensource.org/licenses/CDDL-1.0 - */ - -// Файл: internal/cluster/node.go -// Назначение: Реализация узла кластера (node) для распределённой СУБД. -// Реализация узла кластера (node) для распределённой СУБД. -// Полностью lock-free с использованием атомарных операций. - -package cluster - -import ( - "encoding/json" - "fmt" - "net" - "sync/atomic" - "time" - - "futriis/internal/log" - "futriis/internal/storage" - "github.com/google/uuid" -) - -// NodeStatus представляет состояние узла кластера -type NodeStatus int32 - -const ( - StatusOffline NodeStatus = iota - StatusActive - StatusSyncing - StatusFailed -) - -// Node представляет отдельный узел в распределённой системе -type Node struct { - ID string // Уникальный идентификатор узла - IP string // IP-адрес узла - Port int // Порт для коммуникации - Status atomic.Int32 // Атомарный статус узла (NodeStatus) - Storage *storage.Storage - logger *log.Logger - coordinator *RaftCoordinator // Ссылка на координатора (теперь RaftCoordinator) - lastSeen atomic.Int64 // Время последнего heartbeat (Unix nano) - incomingConn chan net.Conn // Канал для входящих соединений (wait-free) - stopChan chan struct{} -} - -// NodeConfig содержит конфигурацию для создания узла -type NodeConfig struct { - IP string - Port int - Storage *storage.Storage - Logger *log.Logger - Coordinator *RaftCoordinator -} - -// NewNode создаёт новый экземпляр узла кластера -func NewNode(ip string, port int, store *storage.Storage, logger *log.Logger) *Node { - node := &Node{ - ID: uuid.New().String(), - IP: ip, - Port: port, - Storage: store, - logger: logger, - incomingConn: make(chan net.Conn, 1000), // Буферизованный канал для wait-free приёма - stopChan: make(chan struct{}), - } - node.Status.Store(int32(StatusActive)) - node.lastSeen.Store(time.Now().UnixNano()) - - // Запуск сервера для приёма межузловых соединений - go node.startTCPServer() - - // Запуск обработчика входящих соединений - go node.handleIncomingConnections() - - // Запуск heartbeat-отправки (если координатор известен) - go node.heartbeatLoop() - - return node -} - -// startTCPServer запускает TCP-сервер для приёма запросов от других узлов -func (n *Node) startTCPServer() { - addr := fmt.Sprintf("%s:%d", n.IP, n.Port) - listener, err := net.Listen("tcp", addr) - if err != nil { - if n.logger != nil { - n.logger.Error(fmt.Sprintf("Node %s failed to start TCP server: %v", n.ID, err)) - } - n.Status.Store(int32(StatusFailed)) - return - } - defer listener.Close() - - if n.logger != nil { - n.logger.Info(fmt.Sprintf("Node %s listening on %s", n.ID, addr)) - } - - for { - select { - case <-n.stopChan: - return - default: - conn, err := listener.Accept() - if err != nil { - if n.logger != nil { - n.logger.Error(fmt.Sprintf("Node %s accept error: %v", n.ID, err)) - } - continue - } - // Неблокирующая отправка в канал - select { - case n.incomingConn <- conn: - default: - if n.logger != nil { - n.logger.Warn(fmt.Sprintf("Node %s incoming connection queue full, dropping connection", n.ID)) - } - conn.Close() - } - } - } -} - -// handleIncomingConnections обрабатывает входящие соединения wait-free способом -func (n *Node) handleIncomingConnections() { - for { - select { - case <-n.stopChan: - return - case conn := <-n.incomingConn: - go n.handleNodeRequest(conn) - } - } -} - -// handleNodeRequest обрабатывает конкретный запрос от другого узла -func (n *Node) handleNodeRequest(conn net.Conn) { - defer conn.Close() - - decoder := json.NewDecoder(conn) - var req NodeRequest - if err := decoder.Decode(&req); err != nil { - if n.logger != nil { - n.logger.Error(fmt.Sprintf("Node %s failed to decode request: %v", n.ID, err)) - } - return - } - - // Обновляем время последнего контакта - n.lastSeen.Store(time.Now().UnixNano()) - - // Маршрутизация запроса в зависимости от типа - switch req.Type { - case "replicate": - n.handleReplicateRequest(req.Data) - case "query": - n.handleQueryRequest(req.Data, conn) - case "sync": - n.handleSyncRequest(req.Data, conn) - default: - if n.logger != nil { - n.logger.Warn(fmt.Sprintf("Node %s unknown request type: %s", n.ID, req.Type)) - } - } -} - -// handleReplicateRequest обрабатывает запрос на репликацию документа -func (n *Node) handleReplicateRequest(data []byte) { - var repData struct { - Database string `json:"database"` - Collection string `json:"collection"` - Document map[string]interface{} `json:"document"` - } - - if err := json.Unmarshal(data, &repData); err != nil { - if n.logger != nil { - n.logger.Error(fmt.Sprintf("Node %s failed to unmarshal replicate data: %v", n.ID, err)) - } - return - } - - // Получаем базу данных - db, err := n.Storage.GetDatabase(repData.Database) - if err != nil { - if n.logger != nil { - n.logger.Error(fmt.Sprintf("Node %s database not found for replication: %s", n.ID, repData.Database)) - } - return - } - - // Получаем коллекцию - coll, err := db.GetCollection(repData.Collection) - if err != nil { - if n.logger != nil { - n.logger.Error(fmt.Sprintf("Node %s collection not found for replication: %s", n.ID, repData.Collection)) - } - return - } - - // Создаём документ и вставляем - doc := &storage.Document{ - ID: repData.Document["_id"].(string), - Fields: repData.Document, - } - - if err := coll.Insert(doc); err != nil { - if n.logger != nil { - n.logger.Error(fmt.Sprintf("Node %s failed to replicate document: %v", n.ID, err)) - } - } else { - if n.logger != nil { - n.logger.Debug(fmt.Sprintf("Node %s replicated document %s", n.ID, doc.ID)) - } - } -} - -// handleQueryRequest обрабатывает запрос на чтение данных с узла -func (n *Node) handleQueryRequest(data []byte, conn net.Conn) { - var queryData struct { - Database string `json:"database"` - Collection string `json:"collection"` - DocumentID string `json:"document_id"` - } - - if err := json.Unmarshal(data, &queryData); err != nil { - n.sendErrorResponse(conn, err.Error()) - return - } - - // Получаем базу данных - db, err := n.Storage.GetDatabase(queryData.Database) - if err != nil { - n.sendErrorResponse(conn, err.Error()) - return - } - - // Получаем коллекцию - coll, err := db.GetCollection(queryData.Collection) - if err != nil { - n.sendErrorResponse(conn, err.Error()) - return - } - - // Находим документ - doc, err := coll.Find(queryData.DocumentID) - if err != nil { - n.sendErrorResponse(conn, err.Error()) - return - } - - // Отправляем успешный ответ - response := map[string]interface{}{ - "status": "success", - "data": doc, - } - encoder := json.NewEncoder(conn) - encoder.Encode(response) -} - -// handleSyncRequest обрабатывает запрос на синхронизацию всей коллекции -func (n *Node) handleSyncRequest(data []byte, conn net.Conn) { - var syncData struct { - Database string `json:"database"` - Collection string `json:"collection"` - } - - if err := json.Unmarshal(data, &syncData); err != nil { - n.sendErrorResponse(conn, err.Error()) - return - } - - // Получаем базу данных - db, err := n.Storage.GetDatabase(syncData.Database) - if err != nil { - n.sendErrorResponse(conn, err.Error()) - return - } - - // Получаем коллекцию - coll, err := db.GetCollection(syncData.Collection) - if err != nil { - n.sendErrorResponse(conn, err.Error()) - return - } - - // Получаем все документы - docs := coll.GetAllDocuments() - - response := map[string]interface{}{ - "status": "success", - "docs": docs, - "count": len(docs), - } - encoder := json.NewEncoder(conn) - encoder.Encode(response) -} - -// sendErrorResponse отправляет ошибку в ответ на запрос -func (n *Node) sendErrorResponse(conn net.Conn, errMsg string) { - response := map[string]interface{}{ - "status": "error", - "error": errMsg, - } - encoder := json.NewEncoder(conn) - encoder.Encode(response) -} - -// heartbeatLoop отправляет периодические сигналы жизни координатору -func (n *Node) heartbeatLoop() { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-n.stopChan: - return - case <-ticker.C: - if n.coordinator != nil { - n.coordinator.SendHeartbeat(n.ID) - n.lastSeen.Store(time.Now().UnixNano()) - } - } - } -} - -// GetNodeStatus возвращает текущий статус узла (атомарно) -func (n *Node) GetNodeStatus() NodeStatus { - return NodeStatus(n.Status.Load()) -} - -// IsActive проверяет, активен ли узел -func (n *Node) IsActive() bool { - return NodeStatus(n.Status.Load()) == StatusActive -} - -// SetCoordinator устанавливает координатора для узла -func (n *Node) SetCoordinator(coord *RaftCoordinator) { - n.coordinator = coord - if n.logger != nil { - n.logger.Info(fmt.Sprintf("Node %s connected to coordinator", n.ID)) - } -} - -// Stop останавливает работу узла -func (n *Node) Stop() { - n.Status.Store(int32(StatusOffline)) - close(n.stopChan) - if n.logger != nil { - n.logger.Info(fmt.Sprintf("Node %s stopped", n.ID)) - } -} - -// GetAddress возвращает адрес узла в формате "ip:port" -func (n *Node) GetAddress() string { - return fmt.Sprintf("%s:%d", n.IP, n.Port) -} - -// ReplicateDocument отправляет документ на репликацию всем активным узлам -func (n *Node) ReplicateDocument(database, collection string, doc *storage.Document) error { - if n.coordinator == nil { - if n.logger != nil { - n.logger.Warn("No coordinator set, skipping replication") - } - return fmt.Errorf("no coordinator set") - } - - // Получаем список всех узлов от координатора - nodes := n.coordinator.GetActiveNodes() - - for _, nodeInfo := range nodes { - if nodeInfo.ID == n.ID { - continue // Пропускаем себя - } - - // В реальной реализации здесь была бы отправка на узел - if n.logger != nil { - n.logger.Debug(fmt.Sprintf("Would replicate to node %s", nodeInfo.ID)) - } - } - - return nil -}