From 6824a1864064ef2f7e82365de16a7fd63640c589 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Sun, 17 May 2026 14:31:21 +0000 Subject: [PATCH] Delete internal/cluster/raft_coordinator.go --- internal/cluster/raft_coordinator.go | 750 --------------------------- 1 file changed, 750 deletions(-) delete mode 100644 internal/cluster/raft_coordinator.go diff --git a/internal/cluster/raft_coordinator.go b/internal/cluster/raft_coordinator.go deleted file mode 100644 index c0fd5cf..0000000 --- a/internal/cluster/raft_coordinator.go +++ /dev/null @@ -1,750 +0,0 @@ -/* - * Copyright 2026 Safronov Grigorii - * - * Licensed under the CDDL, Version 1.0 (the "License"); - * you may not use this file except in compliance with the License. - * - * You may obtain a copy of the License at - * https://opensource.org/licenses/CDDL-1.0 - */ - -// Файл: internal/cluster/raft_coordinator.go -// Назначение: Реализация координатора распределённого кластера на основе Raft консенсус-алгоритма. -// Обеспечивает управление узлами кластера, выборы лидера, репликацию данных и отказоустойчивость. -// Поддерживает как одноузловой режим работы, так и многокластерную конфигурацию с синхронной/асинхронной репликацией. - -package cluster - -import ( - "encoding/json" - "fmt" - "io" - "net" - "os" - "path/filepath" - "sync" - "sync/atomic" - "time" - - "github.com/hashicorp/raft" - raftboltdb "github.com/hashicorp/raft-boltdb/v2" - "futriis/internal/log" - "futriis/internal/config" -) - -// RaftClusterState представляет состояние кластера для Raft FSM -type RaftClusterState struct { - Nodes map[string]*NodeInfo `json:"nodes"` - ReplicationFactor int32 `json:"replication_factor"` - mu sync.RWMutex -} - -// RaftCoordinator реализует координацию кластера через Raft -type RaftCoordinator struct { - raft *raft.Raft - fsm *RaftFSM - address string - raftAddr string - clusterName string - logger *log.Logger - config *config.Config - stopChan chan struct{} - nodes sync.Map - replicationFactor atomic.Int32 - replicationEnabled bool - masterMasterEnabled bool - syncReplication bool - isLeader atomic.Bool - leaderMonitor chan bool - singleNodeMode bool - localNodeInfo *NodeInfo -} - -// RaftFSM реализует конечный автомат для Raft -type RaftFSM struct { - state *RaftClusterState - logger *log.Logger -} - -// NodeRegistrationCommand команда регистрации узла -type NodeRegistrationCommand struct { - Type string `json:"type"` - Node NodeInfo `json:"node,omitempty"` - NodeID string `json:"node_id,omitempty"` - Factor int32 `json:"factor,omitempty"` -} - -// getLocalIP получает локальный IP адрес -func getLocalIP() string { - addrs, err := net.InterfaceAddrs() - if err != nil { - return "127.0.0.1" - } - for _, addr := range addrs { - if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && ipnet.IP.To4() != nil { - return ipnet.IP.String() - } - } - return "127.0.0.1" -} - -// NewRaftCoordinator создаёт новый Raft координатор -func NewRaftCoordinator(cfg *config.Config, logger *log.Logger) (*RaftCoordinator, error) { - // Используем IP из конфига или автоматически определяем - nodeIP := cfg.Cluster.NodeIP - if nodeIP == "" || nodeIP == "0.0.0.0" { - nodeIP = getLocalIP() - } - - raftAddr := fmt.Sprintf("%s:%d", nodeIP, cfg.Cluster.RaftPort) - - logger.Debug(fmt.Sprintf("Creating Raft coordinator at %s", raftAddr)) - - // Определяем одноузловой режим - singleNodeMode := len(cfg.Cluster.Nodes) <= 1 || cfg.Cluster.Bootstrap - - rc := &RaftCoordinator{ - address: fmt.Sprintf("%s:%d", nodeIP, cfg.Cluster.NodePort), - raftAddr: raftAddr, - clusterName: cfg.Cluster.Name, - logger: logger, - config: cfg, - stopChan: make(chan struct{}), - leaderMonitor: make(chan bool, 1), - replicationEnabled: cfg.Replication.Enabled, - masterMasterEnabled: cfg.Replication.MasterMaster, - syncReplication: cfg.Replication.SyncReplication, - singleNodeMode: singleNodeMode, - localNodeInfo: &NodeInfo{ - ID: fmt.Sprintf("%s-%s", cfg.Cluster.Name, nodeIP), - IP: nodeIP, - Port: cfg.Cluster.NodePort, - Status: "active", - LastSeen: time.Now().Unix(), - }, - } - rc.replicationFactor.Store(int32(3)) - - // Создаём FSM - rc.fsm = &RaftFSM{ - state: &RaftClusterState{ - Nodes: make(map[string]*NodeInfo), - }, - logger: logger, - } - - // В одноузловом режиме добавляем локальный узел в состояние - if singleNodeMode { - rc.fsm.state.mu.Lock() - rc.fsm.state.Nodes[rc.localNodeInfo.ID] = rc.localNodeInfo - rc.fsm.state.mu.Unlock() - rc.isLeader.Store(true) - logger.Debug(fmt.Sprintf("Single-node mode: local node added to state: %s", rc.localNodeInfo.ID)) - } - - // Настраиваем Raft - raftConfig := raft.DefaultConfig() - raftConfig.LocalID = raft.ServerID(rc.localNodeInfo.ID) - raftConfig.HeartbeatTimeout = 1000 * time.Millisecond - raftConfig.ElectionTimeout = 1000 * time.Millisecond - raftConfig.CommitTimeout = 500 * time.Millisecond - raftConfig.LeaderLeaseTimeout = 500 * time.Millisecond - - // Для одноузлового кластера используем специальные настройки и подавляем предупреждения - if singleNodeMode { - raftConfig.HeartbeatTimeout = 500 * time.Millisecond - raftConfig.ElectionTimeout = 500 * time.Millisecond - raftConfig.LeaderLeaseTimeout = 500 * time.Millisecond - // Подавляем вывод предупреждений для одноузлового режима - raftConfig.LogOutput = io.Discard - logger.Debug("Running in single-node mode (warnings suppressed)") - } else { - raftConfig.LogOutput = os.Stderr - } - - // Создаём директорию для Raft данных - dataDir := cfg.Cluster.RaftDataDir - if err := os.MkdirAll(dataDir, 0755); err != nil { - return nil, fmt.Errorf("failed to create raft data dir: %v", err) - } - - logger.Debug(fmt.Sprintf("Raft data directory: %s", dataDir)) - - // Создаём хранилище для логов - logStore, err := raftboltdb.NewBoltStore(filepath.Join(dataDir, "raft-log.bolt")) - if err != nil { - return nil, fmt.Errorf("failed to create log store: %v", err) - } - - // Создаём хранилище для стабильных данных - stableStore, err := raftboltdb.NewBoltStore(filepath.Join(dataDir, "raft-stable.bolt")) - if err != nil { - return nil, fmt.Errorf("failed to create stable store: %v", err) - } - - // Создаём снапшот хранилище - snapshotStore, err := raft.NewFileSnapshotStore(dataDir, 2, os.Stderr) - if err != nil { - return nil, fmt.Errorf("failed to create snapshot store: %v", err) - } - - // Создаём транспорт - transport, err := raft.NewTCPTransport(raftAddr, nil, 3, 10*time.Second, os.Stderr) - if err != nil { - return nil, fmt.Errorf("failed to create transport: %v", err) - } - - // Создаём Raft инстанс - r, err := raft.NewRaft(raftConfig, rc.fsm, logStore, stableStore, snapshotStore, transport) - if err != nil { - return nil, fmt.Errorf("failed to create raft: %v", err) - } - - rc.raft = r - - // Ждём некоторое время для инициализации Raft - time.Sleep(500 * time.Millisecond) - - // Проверяем, нужно ли делать bootstrap - bootstrapPath := filepath.Join(dataDir, "raft-log.bolt") - _, statErr := os.Stat(bootstrapPath) - needsBootstrap := os.IsNotExist(statErr) - - if needsBootstrap && singleNodeMode { - logger.Debug("Bootstrapping single-node cluster...") - - configuration := raft.Configuration{ - Servers: []raft.Server{ - { - ID: raftConfig.LocalID, - Address: transport.LocalAddr(), - }, - }, - } - - future := r.BootstrapCluster(configuration) - if err := future.Error(); err != nil { - logger.Warn(fmt.Sprintf("Bootstrap error: %v", err)) - } else { - logger.Debug("Single-node cluster bootstrapped successfully") - } - - // Ждём после bootstrap - time.Sleep(1 * time.Second) - - } else if needsBootstrap && len(cfg.Cluster.Nodes) > 1 { - logger.Debug("Bootstrapping multi-node cluster...") - - servers := make([]raft.Server, 0, len(cfg.Cluster.Nodes)) - for i, nodeAddr := range cfg.Cluster.Nodes { - serverID := raft.ServerID(fmt.Sprintf("%s-node%d", rc.clusterName, i+1)) - servers = append(servers, raft.Server{ - ID: serverID, - Address: raft.ServerAddress(nodeAddr), - }) - } - - configuration := raft.Configuration{ - Servers: servers, - } - - future := r.BootstrapCluster(configuration) - if err := future.Error(); err != nil { - logger.Warn(fmt.Sprintf("Bootstrap error: %v", err)) - } else { - logger.Debug("Multi-node cluster bootstrapped successfully") - } - - // Запускаем мониторинг лидера - go rc.monitorLeadership() - - // Ждём выборов лидера - logger.Debug("Waiting for leader election...") - timeout := time.After(5 * time.Second) - leaderElected := false - - for !leaderElected { - select { - case isLeader := <-rc.leaderMonitor: - if isLeader { - leaderElected = true - rc.isLeader.Store(true) - logger.Debug("This node is now the cluster leader") - } - case <-timeout: - logger.Warn("Leader election timeout") - leaderElected = true - } - } - } else { - // Существующее состояние, просто подключаемся - logger.Debug("Existing Raft state found, joining cluster...") - go rc.monitorLeadership() - - // В одноузловом режиме не ждём лидера - if !singleNodeMode { - // Проверяем, не являемся ли мы лидером - time.Sleep(1 * time.Second) - if r.State() == raft.Leader { - rc.isLeader.Store(true) - logger.Debug("This node is the cluster leader") - } - } - } - - logger.Debug(fmt.Sprintf("Raft coordinator started at %s, IsLeader: %v, SingleNodeMode: %v", raftAddr, rc.isLeader.Load(), singleNodeMode)) - - return rc, nil -} - -// monitorLeadership отслеживает изменения лидера -func (rc *RaftCoordinator) monitorLeadership() { - ticker := time.NewTicker(500 * time.Millisecond) - defer ticker.Stop() - - wasLeader := false - - for { - select { - case <-rc.stopChan: - return - case <-ticker.C: - if rc.raft == nil { - continue - } - isLeader := rc.raft.State() == raft.Leader - if isLeader != wasLeader { - wasLeader = isLeader - select { - case rc.leaderMonitor <- isLeader: - default: - } - if isLeader { - rc.isLeader.Store(true) - rc.logger.Debug("Leadership acquired") - } else { - rc.isLeader.Store(false) - rc.logger.Debug("Leadership lost") - } - } - } - } -} - -// Apply применяет команду к FSM -func (f *RaftFSM) Apply(log *raft.Log) interface{} { - var cmd NodeRegistrationCommand - if err := json.Unmarshal(log.Data, &cmd); err != nil { - f.logger.Error(fmt.Sprintf("Failed to unmarshal raft command: %v", err)) - return err - } - - f.state.mu.Lock() - defer f.state.mu.Unlock() - - switch cmd.Type { - case "register": - f.state.Nodes[cmd.Node.ID] = &cmd.Node - f.logger.Debug(fmt.Sprintf("Raft: Node registered: %s", cmd.Node.ID)) - case "remove": - delete(f.state.Nodes, cmd.NodeID) - f.logger.Debug(fmt.Sprintf("Raft: Node removed: %s", cmd.NodeID)) - case "set_replication_factor": - f.state.ReplicationFactor = cmd.Factor - f.logger.Debug(fmt.Sprintf("Raft: Replication factor set to %d", cmd.Factor)) - } - - return nil -} - -// Snapshot реализует создание снапшота -func (f *RaftFSM) Snapshot() (raft.FSMSnapshot, error) { - f.state.mu.RLock() - defer f.state.mu.RUnlock() - - stateCopy := &RaftClusterState{ - Nodes: make(map[string]*NodeInfo), - ReplicationFactor: f.state.ReplicationFactor, - } - for k, v := range f.state.Nodes { - stateCopy.Nodes[k] = v - } - - return &RaftSnapshot{state: stateCopy}, nil -} - -// Restore восстанавливает состояние из снапшота -func (f *RaftFSM) Restore(snapshot io.ReadCloser) error { - defer snapshot.Close() - - var state RaftClusterState - decoder := json.NewDecoder(snapshot) - if err := decoder.Decode(&state); err != nil { - return err - } - - f.state.mu.Lock() - defer f.state.mu.Unlock() - f.state.Nodes = state.Nodes - f.state.ReplicationFactor = state.ReplicationFactor - - return nil -} - -// RaftSnapshot реализует интерфейс FSMSnapshot -type RaftSnapshot struct { - state *RaftClusterState -} - -// Persist сохраняет снапшот -func (s *RaftSnapshot) Persist(sink raft.SnapshotSink) error { - err := func() error { - data, err := json.Marshal(s.state) - if err != nil { - return err - } - - if _, err := sink.Write(data); err != nil { - return err - } - - return sink.Close() - }() - - if err != nil { - sink.Cancel() - return err - } - - return nil -} - -// Release освобождает ресурсы -func (s *RaftSnapshot) Release() {} - -// RegisterNode регистрирует узел через Raft -func (rc *RaftCoordinator) RegisterNode(node *Node) error { - nodeInfo := &NodeInfo{ - ID: node.ID, - IP: node.IP, - Port: node.Port, - Status: "active", - LastSeen: time.Now().Unix(), - } - - // В одноузловом режиме всегда считаем себя лидером - if rc.singleNodeMode { - rc.logger.Debug("Single-node mode: registering node without Raft consensus") - - // Просто сохраняем узел локально - rc.nodes.Store(node.ID, nodeInfo) - - // Также сохраняем в FSM - rc.fsm.state.mu.Lock() - rc.fsm.state.Nodes[node.ID] = nodeInfo - rc.fsm.state.mu.Unlock() - - rc.logger.Debug(fmt.Sprintf("Node registered locally in single-node mode: %s", node.ID)) - return nil - } - - // Проверяем, является ли текущий узел лидером - if !rc.IsLeader() { - leader := rc.GetLeader() - if leader != nil { - rc.logger.Warn(fmt.Sprintf("Current node is not leader. Leader is %s:%d", leader.IP, leader.Port)) - return fmt.Errorf("node is not the leader. Please connect to leader at %s:%d", leader.IP, leader.Port) - } - return fmt.Errorf("node is not the leader and no leader found") - } - - cmd := NodeRegistrationCommand{ - Type: "register", - Node: *nodeInfo, - } - - data, err := json.Marshal(cmd) - if err != nil { - return err - } - - future := rc.raft.Apply(data, 5*time.Second) - if err := future.Error(); err != nil { - return fmt.Errorf("failed to register node via raft: %v", err) - } - - rc.nodes.Store(node.ID, nodeInfo) - - rc.logger.Debug(fmt.Sprintf("Node registered via Raft: %s", node.ID)) - return nil -} - -// RemoveNode удаляет узел через Raft -func (rc *RaftCoordinator) RemoveNode(nodeID string) error { - if rc.singleNodeMode { - rc.nodes.Delete(nodeID) - rc.fsm.state.mu.Lock() - delete(rc.fsm.state.Nodes, nodeID) - rc.fsm.state.mu.Unlock() - rc.logger.Debug(fmt.Sprintf("Node removed locally in single-node mode: %s", nodeID)) - return nil - } - - if !rc.IsLeader() { - return fmt.Errorf("node is not the leader") - } - - cmd := NodeRegistrationCommand{ - Type: "remove", - NodeID: nodeID, - } - - data, err := json.Marshal(cmd) - if err != nil { - return err - } - - future := rc.raft.Apply(data, 5*time.Second) - if err := future.Error(); err != nil { - return fmt.Errorf("failed to remove node via raft: %v", err) - } - - rc.nodes.Delete(nodeID) - rc.logger.Debug(fmt.Sprintf("Node removed via Raft: %s", nodeID)) - return nil -} - -// GetActiveNodes возвращает активные узлы -func (rc *RaftCoordinator) GetActiveNodes() []*NodeInfo { - nodes := make([]*NodeInfo, 0) - now := time.Now().Unix() - - state := rc.fsm.state - state.mu.RLock() - defer state.mu.RUnlock() - - for _, nodeInfo := range state.Nodes { - if now-nodeInfo.LastSeen < 30 { - nodes = append(nodes, nodeInfo) - } - } - - // В одноузловом режиме, если список пуст, возвращаем локальный узел - if rc.singleNodeMode && len(nodes) == 0 && rc.localNodeInfo != nil { - nodes = append(nodes, rc.localNodeInfo) - } - - return nodes -} - -// GetAllNodes возвращает все узлы -func (rc *RaftCoordinator) GetAllNodes() []*NodeInfo { - state := rc.fsm.state - state.mu.RLock() - defer state.mu.RUnlock() - - nodes := make([]*NodeInfo, 0, len(state.Nodes)) - for _, node := range state.Nodes { - nodes = append(nodes, node) - } - - // В одноузловом режиме, если список пуст, возвращаем локальный узел - if rc.singleNodeMode && len(nodes) == 0 && rc.localNodeInfo != nil { - nodes = append(nodes, rc.localNodeInfo) - } - - return nodes -} - -// GetLeader возвращает лидера -func (rc *RaftCoordinator) GetLeader() *NodeInfo { - if rc.singleNodeMode { - return rc.localNodeInfo - } - - leaderAddr := rc.raft.Leader() - if leaderAddr == "" { - return nil - } - - state := rc.fsm.state - state.mu.RLock() - defer state.mu.RUnlock() - - for _, node := range state.Nodes { - nodeAddr := fmt.Sprintf("%s:%d", node.IP, node.Port) - if nodeAddr == string(leaderAddr) { - return node - } - } - return nil -} - -// IsLeader проверяет, является ли текущий узел лидером -func (rc *RaftCoordinator) IsLeader() bool { - // В одноузловом режиме всегда лидер - if rc.singleNodeMode { - return true - } - return rc.isLeader.Load() -} - -// SendHeartbeat обновляет heartbeat узла -func (rc *RaftCoordinator) SendHeartbeat(nodeID string) { - if val, ok := rc.nodes.Load(nodeID); ok { - nodeInfo := val.(*NodeInfo) - nodeInfo.LastSeen = time.Now().Unix() - rc.nodes.Store(nodeID, nodeInfo) - } - - // Также обновляем в FSM - rc.fsm.state.mu.Lock() - if nodeInfo, ok := rc.fsm.state.Nodes[nodeID]; ok { - nodeInfo.LastSeen = time.Now().Unix() - } - rc.fsm.state.mu.Unlock() -} - -// GetClusterStatus возвращает статус кластера -func (rc *RaftCoordinator) GetClusterStatus() *ClusterStatus { - nodes := rc.GetAllNodes() - activeNodes := rc.GetActiveNodes() - - syncingNodes := 0 - for _, node := range nodes { - if node.Status == "syncing" { - syncingNodes++ - } - } - - leader := rc.GetLeader() - leaderID := "" - if leader != nil { - leaderID = leader.ID - } - - return &ClusterStatus{ - Name: rc.clusterName, - TotalNodes: len(nodes), - ActiveNodes: len(activeNodes), - SyncingNodes: syncingNodes, - FailedNodes: len(nodes) - len(activeNodes), - ReplicationFactor: int(rc.replicationFactor.Load()), - LeaderID: leaderID, - Health: rc.calculateHealth(), - } -} - -// calculateHealth вычисляет здоровье кластера -func (rc *RaftCoordinator) calculateHealth() string { - activeNodes := rc.GetActiveNodes() - totalNodes := rc.GetAllNodes() - - if len(totalNodes) == 0 { - return "critical" - } - - ratio := float64(len(activeNodes)) / float64(len(totalNodes)) - if ratio >= 0.8 { - return "healthy" - } else if ratio >= 0.5 { - return "degraded" - } - return "critical" -} - -// GetReplicationFactor возвращает фактор репликации -func (rc *RaftCoordinator) GetReplicationFactor() int { - return int(rc.replicationFactor.Load()) -} - -// SetReplicationFactor устанавливает фактор репликации через Raft -func (rc *RaftCoordinator) SetReplicationFactor(factor int) error { - if !rc.IsLeader() { - return fmt.Errorf("node is not the leader") - } - - cmd := NodeRegistrationCommand{ - Type: "set_replication_factor", - Factor: int32(factor), - } - - data, err := json.Marshal(cmd) - if err != nil { - return err - } - - future := rc.raft.Apply(data, 5*time.Second) - if err := future.Error(); err != nil { - return fmt.Errorf("failed to set replication factor via raft: %v", err) - } - - rc.replicationFactor.Store(int32(factor)) - rc.logger.Debug(fmt.Sprintf("Replication factor set to %d via Raft", factor)) - return nil -} - -// GetClusterHealth возвращает детальную информацию о здоровье кластера -func (rc *RaftCoordinator) GetClusterHealth() *ClusterHealth { - health := &ClusterHealth{ - Nodes: make(map[string]*NodeHealth), - OverallScore: 100.0, - Recommendations: "", - } - - now := time.Now().Unix() - state := rc.fsm.state - state.mu.RLock() - defer state.mu.RUnlock() - - for nodeID, nodeInfo := range state.Nodes { - nodeHealth := &NodeHealth{ - Status: nodeInfo.Status, - LatencyMs: 0, - LastCheck: now, - } - - if now-nodeInfo.LastSeen > 30 { - nodeHealth.Status = "offline" - health.OverallScore -= 10 - } else if nodeInfo.Status == "syncing" { - health.OverallScore -= 5 - } - - health.Nodes[nodeID] = nodeHealth - } - - if health.OverallScore < 50 { - health.Recommendations = "Critical: Check network connectivity and node health immediately" - } else if health.OverallScore < 80 { - health.Recommendations = "Warning: Some nodes are offline or syncing, consider adding more nodes" - } else { - health.Recommendations = "Cluster is healthy, all systems operational" - } - - return health -} - -// IsReplicationEnabled возвращает статус репликации -func (rc *RaftCoordinator) IsReplicationEnabled() bool { - return rc.replicationEnabled -} - -// IsMasterMasterEnabled возвращает статус мастер-мастер репликации -func (rc *RaftCoordinator) IsMasterMasterEnabled() bool { - return rc.masterMasterEnabled -} - -// IsSyncReplicationEnabled возвращает статус синхронной репликации -func (rc *RaftCoordinator) IsSyncReplicationEnabled() bool { - return rc.syncReplication -} - -// Stop останавливает координатор -func (rc *RaftCoordinator) Stop() { - close(rc.stopChan) - if rc.raft != nil { - rc.raft.Shutdown() - } - rc.logger.Debug("Raft coordinator stopped") -}