// /futriis/internal/cluster/node.go // Пакет cluster реализует управление кластером и репликацию package cluster import ( "encoding/json" "errors" "fmt" "net" "sync" "sync/atomic" "time" "futriis/pkg/config" "futriis/pkg/utils" ) // NodeState состояние узла type NodeState int const ( StateOffline NodeState = iota StateJoining StateOnline StateLeaving StateFailed ) func (s NodeState) String() string { switch s { case StateOffline: return "offline" case StateJoining: return "joining" case StateOnline: return "online" case StateLeaving: return "leaving" case StateFailed: return "failed" default: return "unknown" } } // Node представляет узел кластера type Node struct { ID string `json:"id"` Address string `json:"address"` State NodeState `json:"state"` LastSeen time.Time `json:"last_seen"` ShardIDs []string `json:"shard_ids"` mu sync.RWMutex } // ClusterManager управляет кластером type ClusterManager struct { nodes map[string]*Node coordinatorAddr string nodeID string localAddr string isCoordinator bool mu sync.RWMutex heartbeatStop chan struct{} stats struct { totalNodes int64 activeNodes int64 rebalancingCnt int64 } } // NewClusterManager создаёт новый менеджер кластера func NewClusterManager(cfg *config.Config) *ClusterManager { cm := &ClusterManager{ nodes: make(map[string]*Node), coordinatorAddr: cfg.Cluster.CoordinatorAddress, nodeID: cfg.Node.ID, localAddr: cfg.Node.Address, isCoordinator: cfg.Node.Address == cfg.Cluster.CoordinatorAddress, heartbeatStop: make(chan struct{}), } // Добавляем себя в кластер cm.addNode(&Node{ ID: cfg.Node.ID, Address: cfg.Node.Address, State: StateOnline, LastSeen: time.Now(), }) return cm } // Start запускает кластерные сервисы func (cm *ClusterManager) Start() error { if cm.isCoordinator { // Запускаем координатор go cm.startCoordinator() } // Запускаем heartbeat go cm.heartbeatLoop() utils.PrintInfo("Кластерный менеджер запущен") return nil } // Stop останавливает кластерные сервисы func (cm *ClusterManager) Stop() { close(cm.heartbeatStop) } // heartbeatLoop отправляет heartbeat сигналы func (cm *ClusterManager) heartbeatLoop() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: cm.sendHeartbeat() case <-cm.heartbeatStop: return } } } // sendHeartbeat отправляет heartbeat координатору func (cm *ClusterManager) sendHeartbeat() { if !cm.isCoordinator { // Отправляем heartbeat координатору conn, err := net.DialTimeout("tcp", cm.coordinatorAddr, 3*time.Second) if err != nil { return } defer conn.Close() heartbeat := map[string]interface{}{ "type": "heartbeat", "node_id": cm.nodeID, "address": cm.localAddr, "time": time.Now().Unix(), } json.NewEncoder(conn).Encode(heartbeat) } } // startCoordinator запускает координатор кластера func (cm *ClusterManager) startCoordinator() { listener, err := net.Listen("tcp", cm.coordinatorAddr) if err != nil { utils.PrintError("Ошибка запуска координатора: %v", err) return } defer listener.Close() utils.PrintInfo("Координатор кластера запущен на " + cm.coordinatorAddr) for { conn, err := listener.Accept() if err != nil { continue } go cm.handleCoordinatorRequest(conn) } } // handleCoordinatorRequest обрабатывает запросы к координатору func (cm *ClusterManager) handleCoordinatorRequest(conn net.Conn) { defer conn.Close() var req map[string]interface{} if err := json.NewDecoder(conn).Decode(&req); err != nil { return } msgType, _ := req["type"].(string) switch msgType { case "heartbeat": nodeID, _ := req["node_id"].(string) address, _ := req["address"].(string) cm.updateNodeHeartbeat(nodeID, address) case "join": nodeID, _ := req["node_id"].(string) address, _ := req["address"].(string) cm.handleNodeJoin(nodeID, address) case "leave": nodeID, _ := req["node_id"].(string) cm.handleNodeLeave(nodeID) } } // handleNodeJoin обрабатывает присоединение узла func (cm *ClusterManager) handleNodeJoin(nodeID, address string) { cm.mu.Lock() defer cm.mu.Unlock() if node, exists := cm.nodes[nodeID]; exists { node.State = StateOnline node.LastSeen = time.Now() } else { cm.nodes[nodeID] = &Node{ ID: nodeID, Address: address, State: StateOnline, LastSeen: time.Now(), } atomic.AddInt64(&cm.stats.totalNodes, 1) } atomic.AddInt64(&cm.stats.activeNodes, 1) utils.PrintSuccess("Узел %s (%s) присоединился к кластеру", nodeID, address) } // handleNodeLeave обрабатывает отключение узла func (cm *ClusterManager) handleNodeLeave(nodeID string) { cm.mu.Lock() defer cm.mu.Unlock() if node, exists := cm.nodes[nodeID]; exists { node.State = StateOffline atomic.AddInt64(&cm.stats.activeNodes, -1) utils.PrintWarning("Узел %s покинул кластер", nodeID) } } // updateNodeHeartbeat обновляет время последнего heartbeat узла func (cm *ClusterManager) updateNodeHeartbeat(nodeID, address string) { cm.mu.Lock() defer cm.mu.Unlock() if node, exists := cm.nodes[nodeID]; exists { node.LastSeen = time.Now() if node.State == StateOffline { node.State = StateOnline atomic.AddInt64(&cm.stats.activeNodes, 1) } } else { cm.nodes[nodeID] = &Node{ ID: nodeID, Address: address, State: StateOnline, LastSeen: time.Now(), } atomic.AddInt64(&cm.stats.totalNodes, 1) atomic.AddInt64(&cm.stats.activeNodes, 1) } } // AddNode добавляет новый узел в кластер func (cm *ClusterManager) AddNode(address string) error { if !cm.isCoordinator { return errors.New("только координатор может добавлять узлы") } cm.mu.Lock() defer cm.mu.Unlock() // Генерируем ID для нового узла nodeID := fmt.Sprintf("node-%d", len(cm.nodes)+1) cm.nodes[nodeID] = &Node{ ID: nodeID, Address: address, State: StateJoining, LastSeen: time.Now(), } atomic.AddInt64(&cm.stats.totalNodes, 1) utils.PrintSuccess("Узел %s (%s) добавлен в кластер", nodeID, address) return nil } // RemoveNode удаляет узел из кластера func (cm *ClusterManager) RemoveNode(nodeID string) error { if !cm.isCoordinator { return errors.New("только координатор может удалять узлы") } cm.mu.Lock() defer cm.mu.Unlock() if node, exists := cm.nodes[nodeID]; exists { node.State = StateLeaving delete(cm.nodes, nodeID) atomic.AddInt64(&cm.stats.totalNodes, -1) if node.State == StateOnline { atomic.AddInt64(&cm.stats.activeNodes, -1) } utils.PrintSuccess("Узел %s удален из кластера", nodeID) return nil } return errors.New("узел не найден") } // GetClusterStatus возвращает статус кластера func (cm *ClusterManager) GetClusterStatus() map[string]interface{} { cm.mu.RLock() defer cm.mu.RUnlock() status := make(map[string]interface{}) status["total_nodes"] = atomic.LoadInt64(&cm.stats.totalNodes) status["active_nodes"] = atomic.LoadInt64(&cm.stats.activeNodes) status["is_coordinator"] = cm.isCoordinator status["coordinator"] = cm.coordinatorAddr nodes := make([]map[string]interface{}, 0, len(cm.nodes)) for id, node := range cm.nodes { nodes = append(nodes, map[string]interface{}{ "id": id, "address": node.Address, "state": node.State.String(), "last_seen": node.LastSeen.Format(time.RFC3339), }) } status["nodes"] = nodes return status } // addNode добавляет узел во внутреннюю карту func (cm *ClusterManager) addNode(node *Node) { cm.mu.Lock() defer cm.mu.Unlock() cm.nodes[node.ID] = node atomic.AddInt64(&cm.stats.totalNodes, 1) if node.State == StateOnline { atomic.AddInt64(&cm.stats.activeNodes, 1) } }