/* * Copyright 2026 Safronov Grigorii * * Licensed under the CDDL, Version 1.0 (the "License"); * you may not use this file except in compliance with the License. * * You may obtain a copy of the License at * https://opensource.org/licenses/CDDL-1.0 */ // Файл: internal/cluster/raft_coordinator.go // Назначение: Реализация координатора распределённого кластера на основе Raft консенсус-алгоритма. // Обеспечивает управление узлами кластера, выборы лидера, репликацию данных и отказоустойчивость. // Поддерживает как одноузловой режим работы, так и многокластерную конфигурацию с синхронной/асинхронной репликацией. package cluster import ( "encoding/json" "fmt" "io" "net" "os" "path/filepath" "sync" "sync/atomic" "time" "github.com/hashicorp/raft" "futriis/internal/log" "futriis/internal/config" ) // InmemStore реализует встроенное файловое хранилище для Raft // Реализует интерфейсы raft.LogStore и raft.StableStore type InmemStore struct { mu sync.RWMutex data map[string][]byte path string } // NewInmemStore создаёт новое хранилище func NewInmemStore(path string) *InmemStore { store := &InmemStore{ data: make(map[string][]byte), path: path, } // Пытаемся загрузить существующие данные store.load() return store } // load загружает данные из файла func (s *InmemStore) load() { if s.path == "" { return } data, err := os.ReadFile(s.path) if err != nil { return } json.Unmarshal(data, &s.data) } // save сохраняет данные в файл func (s *InmemStore) save() { if s.path == "" { return } data, _ := json.Marshal(s.data) os.WriteFile(s.path, data, 0644) } // ==================== Реализация raft.LogStore ==================== // FirstIndex возвращает первый индекс в логе func (s *InmemStore) FirstIndex() (uint64, error) { s.mu.RLock() defer s.mu.RUnlock() var first uint64 = 0 for key := range s.data { var idx uint64 if _, err := fmt.Sscanf(key, "log-%d", &idx); err == nil { if first == 0 || idx < first { first = idx } } } return first, nil } // LastIndex возвращает последний индекс в логе func (s *InmemStore) LastIndex() (uint64, error) { s.mu.RLock() defer s.mu.RUnlock() var last uint64 = 0 for key := range s.data { var idx uint64 if _, err := fmt.Sscanf(key, "log-%d", &idx); err == nil { if idx > last { last = idx } } } return last, nil } // GetLog получает лог по индексу func (s *InmemStore) GetLog(idx uint64, log *raft.Log) error { s.mu.RLock() defer s.mu.RUnlock() key := fmt.Sprintf("log-%d", idx) data, ok := s.data[key] if !ok { return raft.ErrLogNotFound } return json.Unmarshal(data, log) } // StoreLog сохраняет один лог func (s *InmemStore) StoreLog(log *raft.Log) error { return s.StoreLogs([]*raft.Log{log}) } // StoreLogs сохраняет несколько логов func (s *InmemStore) StoreLogs(logs []*raft.Log) error { s.mu.Lock() defer s.mu.Unlock() for _, log := range logs { key := fmt.Sprintf("log-%d", log.Index) data, err := json.Marshal(log) if err != nil { return err } s.data[key] = data } s.save() return nil } // DeleteRange удаляет диапазон логов func (s *InmemStore) DeleteRange(min, max uint64) error { s.mu.Lock() defer s.mu.Unlock() for idx := min; idx <= max; idx++ { key := fmt.Sprintf("log-%d", idx) delete(s.data, key) } s.save() return nil } // ==================== Реализация raft.StableStore ==================== // Get реализует raft.StableStore func (s *InmemStore) Get(key []byte) ([]byte, error) { s.mu.RLock() defer s.mu.RUnlock() val, ok := s.data[string(key)] if !ok { return nil, nil } return val, nil } // Set реализует raft.StableStore func (s *InmemStore) Set(key []byte, val []byte) error { s.mu.Lock() defer s.mu.Unlock() s.data[string(key)] = val s.save() return nil } // SetUint64 реализует raft.StableStore func (s *InmemStore) SetUint64(key []byte, val uint64) error { return s.Set(key, []byte(fmt.Sprintf("%d", val))) } // GetUint64 реализует raft.StableStore func (s *InmemStore) GetUint64(key []byte) (uint64, error) { val, err := s.Get(key) if err != nil { return 0, err } if val == nil { return 0, nil } var result uint64 fmt.Sscanf(string(val), "%d", &result) return result, nil } // RaftClusterState представляет состояние кластера для Raft FSM type RaftClusterState struct { Nodes map[string]*NodeInfo `json:"nodes"` ReplicationFactor int32 `json:"replication_factor"` mu sync.RWMutex } // RaftCoordinator реализует координацию кластера через Raft type RaftCoordinator struct { raft *raft.Raft fsm *RaftFSM address string raftAddr string clusterName string logger *log.Logger config *config.Config stopChan chan struct{} nodes sync.Map replicationFactor atomic.Int32 replicationEnabled bool masterMasterEnabled bool syncReplication bool isLeader atomic.Bool leaderMonitor chan bool singleNodeMode bool localNodeInfo *NodeInfo logStore *InmemStore stableStore *InmemStore } // RaftFSM реализует конечный автомат для Raft type RaftFSM struct { state *RaftClusterState logger *log.Logger } // NodeRegistrationCommand команда регистрации узла type NodeRegistrationCommand struct { Type string `json:"type"` Node NodeInfo `json:"node,omitempty"` NodeID string `json:"node_id,omitempty"` Factor int32 `json:"factor,omitempty"` } // getLocalIP получает локальный IP адрес func getLocalIP() string { addrs, err := net.InterfaceAddrs() if err != nil { return "127.0.0.1" } for _, addr := range addrs { if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && ipnet.IP.To4() != nil { return ipnet.IP.String() } } return "127.0.0.1" } // NewRaftCoordinator создаёт новый Raft координатор func NewRaftCoordinator(cfg *config.Config, logger *log.Logger) (*RaftCoordinator, error) { nodeIP := cfg.Cluster.NodeIP if nodeIP == "" || nodeIP == "0.0.0.0" { nodeIP = getLocalIP() } raftAddr := fmt.Sprintf("%s:%d", nodeIP, cfg.Cluster.RaftPort) logger.Debug(fmt.Sprintf("Creating Raft coordinator at %s", raftAddr)) singleNodeMode := len(cfg.Cluster.Nodes) <= 1 || cfg.Cluster.Bootstrap rc := &RaftCoordinator{ address: fmt.Sprintf("%s:%d", nodeIP, cfg.Cluster.NodePort), raftAddr: raftAddr, clusterName: cfg.Cluster.Name, logger: logger, config: cfg, stopChan: make(chan struct{}), leaderMonitor: make(chan bool, 1), replicationEnabled: cfg.Replication.Enabled, masterMasterEnabled: cfg.Replication.MasterMaster, syncReplication: cfg.Replication.SyncReplication, singleNodeMode: singleNodeMode, localNodeInfo: &NodeInfo{ ID: fmt.Sprintf("%s-%s", cfg.Cluster.Name, nodeIP), IP: nodeIP, Port: cfg.Cluster.NodePort, Status: "active", LastSeen: time.Now().Unix(), }, } rc.replicationFactor.Store(int32(3)) rc.fsm = &RaftFSM{ state: &RaftClusterState{ Nodes: make(map[string]*NodeInfo), }, logger: logger, } if singleNodeMode { rc.fsm.state.mu.Lock() rc.fsm.state.Nodes[rc.localNodeInfo.ID] = rc.localNodeInfo rc.fsm.state.mu.Unlock() rc.isLeader.Store(true) logger.Debug(fmt.Sprintf("Single-node mode: local node added to state: %s", rc.localNodeInfo.ID)) } raftConfig := raft.DefaultConfig() raftConfig.LocalID = raft.ServerID(rc.localNodeInfo.ID) raftConfig.HeartbeatTimeout = 1000 * time.Millisecond raftConfig.ElectionTimeout = 1000 * time.Millisecond raftConfig.CommitTimeout = 500 * time.Millisecond raftConfig.LeaderLeaseTimeout = 500 * time.Millisecond if singleNodeMode { raftConfig.HeartbeatTimeout = 500 * time.Millisecond raftConfig.ElectionTimeout = 500 * time.Millisecond raftConfig.LeaderLeaseTimeout = 500 * time.Millisecond raftConfig.LogOutput = io.Discard logger.Debug("Running in single-node mode (warnings suppressed)") } else { raftConfig.LogOutput = os.Stderr } dataDir := cfg.Cluster.RaftDataDir if err := os.MkdirAll(dataDir, 0755); err != nil { return nil, fmt.Errorf("failed to create raft data dir: %v", err) } logger.Debug(fmt.Sprintf("Raft data directory: %s", dataDir)) // Используем встроенные хранилища вместо boltDB rc.logStore = NewInmemStore(filepath.Join(dataDir, "raft-log.json")) rc.stableStore = NewInmemStore(filepath.Join(dataDir, "raft-stable.json")) // Создаём снапшот хранилище snapshotStore, err := raft.NewFileSnapshotStore(dataDir, 2, os.Stderr) if err != nil { return nil, fmt.Errorf("failed to create snapshot store: %v", err) } // Создаём транспорт transport, err := raft.NewTCPTransport(raftAddr, nil, 3, 10*time.Second, os.Stderr) if err != nil { return nil, fmt.Errorf("failed to create transport: %v", err) } // Создаём Raft инстанс - используем оба хранилища r, err := raft.NewRaft(raftConfig, rc.fsm, rc.logStore, rc.stableStore, snapshotStore, transport) if err != nil { return nil, fmt.Errorf("failed to create raft: %v", err) } rc.raft = r time.Sleep(500 * time.Millisecond) bootstrapPath := filepath.Join(dataDir, "raft-log.json") _, statErr := os.Stat(bootstrapPath) needsBootstrap := os.IsNotExist(statErr) if needsBootstrap && singleNodeMode { logger.Debug("Bootstrapping single-node cluster...") configuration := raft.Configuration{ Servers: []raft.Server{ { ID: raftConfig.LocalID, Address: transport.LocalAddr(), }, }, } future := r.BootstrapCluster(configuration) if err := future.Error(); err != nil { logger.Warn(fmt.Sprintf("Bootstrap error: %v", err)) } else { logger.Debug("Single-node cluster bootstrapped successfully") } time.Sleep(1 * time.Second) } else if needsBootstrap && len(cfg.Cluster.Nodes) > 1 { logger.Debug("Bootstrapping multi-node cluster...") servers := make([]raft.Server, 0, len(cfg.Cluster.Nodes)) for i, nodeAddr := range cfg.Cluster.Nodes { serverID := raft.ServerID(fmt.Sprintf("%s-node%d", rc.clusterName, i+1)) servers = append(servers, raft.Server{ ID: serverID, Address: raft.ServerAddress(nodeAddr), }) } configuration := raft.Configuration{ Servers: servers, } future := r.BootstrapCluster(configuration) if err := future.Error(); err != nil { logger.Warn(fmt.Sprintf("Bootstrap error: %v", err)) } else { logger.Debug("Multi-node cluster bootstrapped successfully") } go rc.monitorLeadership() logger.Debug("Waiting for leader election...") timeout := time.After(5 * time.Second) leaderElected := false for !leaderElected { select { case isLeader := <-rc.leaderMonitor: if isLeader { leaderElected = true rc.isLeader.Store(true) logger.Debug("This node is now the cluster leader") } case <-timeout: logger.Warn("Leader election timeout") leaderElected = true } } } else { logger.Debug("Existing Raft state found, joining cluster...") go rc.monitorLeadership() if !singleNodeMode { time.Sleep(1 * time.Second) if r.State() == raft.Leader { rc.isLeader.Store(true) logger.Debug("This node is the cluster leader") } } } logger.Debug(fmt.Sprintf("Raft coordinator started at %s, IsLeader: %v, SingleNodeMode: %v", raftAddr, rc.isLeader.Load(), singleNodeMode)) return rc, nil } // monitorLeadership отслеживает изменения лидера func (rc *RaftCoordinator) monitorLeadership() { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() wasLeader := false for { select { case <-rc.stopChan: return case <-ticker.C: if rc.raft == nil { continue } isLeader := rc.raft.State() == raft.Leader if isLeader != wasLeader { wasLeader = isLeader select { case rc.leaderMonitor <- isLeader: default: } if isLeader { rc.isLeader.Store(true) rc.logger.Debug("Leadership acquired") } else { rc.isLeader.Store(false) rc.logger.Debug("Leadership lost") } } } } } // Apply применяет команду к FSM func (f *RaftFSM) Apply(log *raft.Log) interface{} { var cmd NodeRegistrationCommand if err := json.Unmarshal(log.Data, &cmd); err != nil { f.logger.Error(fmt.Sprintf("Failed to unmarshal raft command: %v", err)) return err } f.state.mu.Lock() defer f.state.mu.Unlock() switch cmd.Type { case "register": f.state.Nodes[cmd.Node.ID] = &cmd.Node f.logger.Debug(fmt.Sprintf("Raft: Node registered: %s", cmd.Node.ID)) case "remove": delete(f.state.Nodes, cmd.NodeID) f.logger.Debug(fmt.Sprintf("Raft: Node removed: %s", cmd.NodeID)) case "set_replication_factor": f.state.ReplicationFactor = cmd.Factor f.logger.Debug(fmt.Sprintf("Raft: Replication factor set to %d", cmd.Factor)) } return nil } // Snapshot реализует создание снапшота func (f *RaftFSM) Snapshot() (raft.FSMSnapshot, error) { f.state.mu.RLock() defer f.state.mu.RUnlock() stateCopy := &RaftClusterState{ Nodes: make(map[string]*NodeInfo), ReplicationFactor: f.state.ReplicationFactor, } for k, v := range f.state.Nodes { stateCopy.Nodes[k] = v } return &RaftSnapshot{state: stateCopy}, nil } // Restore восстанавливает состояние из снапшота func (f *RaftFSM) Restore(snapshot io.ReadCloser) error { defer snapshot.Close() var state RaftClusterState decoder := json.NewDecoder(snapshot) if err := decoder.Decode(&state); err != nil { return err } f.state.mu.Lock() defer f.state.mu.Unlock() f.state.Nodes = state.Nodes f.state.ReplicationFactor = state.ReplicationFactor return nil } // RaftSnapshot реализует интерфейс FSMSnapshot type RaftSnapshot struct { state *RaftClusterState } // Persist сохраняет снапшот func (s *RaftSnapshot) Persist(sink raft.SnapshotSink) error { err := func() error { data, err := json.Marshal(s.state) if err != nil { return err } if _, err := sink.Write(data); err != nil { return err } return sink.Close() }() if err != nil { sink.Cancel() return err } return nil } // Release освобождает ресурсы func (s *RaftSnapshot) Release() {} // RegisterNode регистрирует узел через Raft func (rc *RaftCoordinator) RegisterNode(node *Node) error { nodeInfo := &NodeInfo{ ID: node.ID, IP: node.IP, Port: node.Port, Status: "active", LastSeen: time.Now().Unix(), } if rc.singleNodeMode { rc.logger.Debug("Single-node mode: registering node without Raft consensus") rc.nodes.Store(node.ID, nodeInfo) rc.fsm.state.mu.Lock() rc.fsm.state.Nodes[node.ID] = nodeInfo rc.fsm.state.mu.Unlock() rc.logger.Debug(fmt.Sprintf("Node registered locally in single-node mode: %s", node.ID)) return nil } if !rc.IsLeader() { leader := rc.GetLeader() if leader != nil { rc.logger.Warn(fmt.Sprintf("Current node is not leader. Leader is %s:%d", leader.IP, leader.Port)) return fmt.Errorf("node is not the leader. Please connect to leader at %s:%d", leader.IP, leader.Port) } return fmt.Errorf("node is not the leader and no leader found") } cmd := NodeRegistrationCommand{ Type: "register", Node: *nodeInfo, } data, err := json.Marshal(cmd) if err != nil { return err } future := rc.raft.Apply(data, 5*time.Second) if err := future.Error(); err != nil { return fmt.Errorf("failed to register node via raft: %v", err) } rc.nodes.Store(node.ID, nodeInfo) rc.logger.Debug(fmt.Sprintf("Node registered via Raft: %s", node.ID)) return nil } // RemoveNode удаляет узел через Raft func (rc *RaftCoordinator) RemoveNode(nodeID string) error { if rc.singleNodeMode { rc.nodes.Delete(nodeID) rc.fsm.state.mu.Lock() delete(rc.fsm.state.Nodes, nodeID) rc.fsm.state.mu.Unlock() rc.logger.Debug(fmt.Sprintf("Node removed locally in single-node mode: %s", nodeID)) return nil } if !rc.IsLeader() { return fmt.Errorf("node is not the leader") } cmd := NodeRegistrationCommand{ Type: "remove", NodeID: nodeID, } data, err := json.Marshal(cmd) if err != nil { return err } future := rc.raft.Apply(data, 5*time.Second) if err := future.Error(); err != nil { return fmt.Errorf("failed to remove node via raft: %v", err) } rc.nodes.Delete(nodeID) rc.logger.Debug(fmt.Sprintf("Node removed via Raft: %s", nodeID)) return nil } // GetActiveNodes возвращает активные узлы func (rc *RaftCoordinator) GetActiveNodes() []*NodeInfo { nodes := make([]*NodeInfo, 0) now := time.Now().Unix() state := rc.fsm.state state.mu.RLock() defer state.mu.RUnlock() for _, nodeInfo := range state.Nodes { if now-nodeInfo.LastSeen < 30 { nodes = append(nodes, nodeInfo) } } if rc.singleNodeMode && len(nodes) == 0 && rc.localNodeInfo != nil { nodes = append(nodes, rc.localNodeInfo) } return nodes } // GetAllNodes возвращает все узлы func (rc *RaftCoordinator) GetAllNodes() []*NodeInfo { state := rc.fsm.state state.mu.RLock() defer state.mu.RUnlock() nodes := make([]*NodeInfo, 0, len(state.Nodes)) for _, node := range state.Nodes { nodes = append(nodes, node) } if rc.singleNodeMode && len(nodes) == 0 && rc.localNodeInfo != nil { nodes = append(nodes, rc.localNodeInfo) } return nodes } // GetLeader возвращает лидера func (rc *RaftCoordinator) GetLeader() *NodeInfo { if rc.singleNodeMode { return rc.localNodeInfo } leaderAddr := rc.raft.Leader() if leaderAddr == "" { return nil } state := rc.fsm.state state.mu.RLock() defer state.mu.RUnlock() for _, node := range state.Nodes { nodeAddr := fmt.Sprintf("%s:%d", node.IP, node.Port) if nodeAddr == string(leaderAddr) { return node } } return nil } // IsLeader проверяет, является ли текущий узел лидером func (rc *RaftCoordinator) IsLeader() bool { if rc.singleNodeMode { return true } return rc.isLeader.Load() } // SendHeartbeat обновляет heartbeat узла func (rc *RaftCoordinator) SendHeartbeat(nodeID string) { if val, ok := rc.nodes.Load(nodeID); ok { nodeInfo := val.(*NodeInfo) nodeInfo.LastSeen = time.Now().Unix() rc.nodes.Store(nodeID, nodeInfo) } rc.fsm.state.mu.Lock() if nodeInfo, ok := rc.fsm.state.Nodes[nodeID]; ok { nodeInfo.LastSeen = time.Now().Unix() } rc.fsm.state.mu.Unlock() } // GetClusterStatus возвращает статус кластера func (rc *RaftCoordinator) GetClusterStatus() *ClusterStatus { nodes := rc.GetAllNodes() activeNodes := rc.GetActiveNodes() syncingNodes := 0 for _, node := range nodes { if node.Status == "syncing" { syncingNodes++ } } leader := rc.GetLeader() leaderID := "" if leader != nil { leaderID = leader.ID } return &ClusterStatus{ Name: rc.clusterName, TotalNodes: len(nodes), ActiveNodes: len(activeNodes), SyncingNodes: syncingNodes, FailedNodes: len(nodes) - len(activeNodes), ReplicationFactor: int(rc.replicationFactor.Load()), LeaderID: leaderID, Health: rc.calculateHealth(), } } // calculateHealth вычисляет здоровье кластера func (rc *RaftCoordinator) calculateHealth() string { activeNodes := rc.GetActiveNodes() totalNodes := rc.GetAllNodes() if len(totalNodes) == 0 { return "critical" } ratio := float64(len(activeNodes)) / float64(len(totalNodes)) if ratio >= 0.8 { return "healthy" } else if ratio >= 0.5 { return "degraded" } return "critical" } // GetReplicationFactor возвращает фактор репликации func (rc *RaftCoordinator) GetReplicationFactor() int { return int(rc.replicationFactor.Load()) } // SetReplicationFactor устанавливает фактор репликации через Raft func (rc *RaftCoordinator) SetReplicationFactor(factor int) error { if !rc.IsLeader() { return fmt.Errorf("node is not the leader") } cmd := NodeRegistrationCommand{ Type: "set_replication_factor", Factor: int32(factor), } data, err := json.Marshal(cmd) if err != nil { return err } future := rc.raft.Apply(data, 5*time.Second) if err := future.Error(); err != nil { return fmt.Errorf("failed to set replication factor via raft: %v", err) } rc.replicationFactor.Store(int32(factor)) rc.logger.Debug(fmt.Sprintf("Replication factor set to %d via Raft", factor)) return nil } // GetClusterHealth возвращает детальную информацию о здоровье кластера func (rc *RaftCoordinator) GetClusterHealth() *ClusterHealth { health := &ClusterHealth{ Nodes: make(map[string]*NodeHealth), OverallScore: 100.0, Recommendations: "", } now := time.Now().Unix() state := rc.fsm.state state.mu.RLock() defer state.mu.RUnlock() for nodeID, nodeInfo := range state.Nodes { nodeHealth := &NodeHealth{ Status: nodeInfo.Status, LatencyMs: 0, LastCheck: now, } if now-nodeInfo.LastSeen > 30 { nodeHealth.Status = "offline" health.OverallScore -= 10 } else if nodeInfo.Status == "syncing" { health.OverallScore -= 5 } health.Nodes[nodeID] = nodeHealth } if health.OverallScore < 50 { health.Recommendations = "Critical: Check network connectivity and node health immediately" } else if health.OverallScore < 80 { health.Recommendations = "Warning: Some nodes are offline or syncing, consider adding more nodes" } else { health.Recommendations = "Cluster is healthy, all systems operational" } return health } // IsReplicationEnabled возвращает статус репликации func (rc *RaftCoordinator) IsReplicationEnabled() bool { return rc.replicationEnabled } // IsMasterMasterEnabled возвращает статус мастер-мастер репликации func (rc *RaftCoordinator) IsMasterMasterEnabled() bool { return rc.masterMasterEnabled } // IsSyncReplicationEnabled возвращает статус синхронной репликации func (rc *RaftCoordinator) IsSyncReplicationEnabled() bool { return rc.syncReplication } // Stop останавливает координатор func (rc *RaftCoordinator) Stop() { close(rc.stopChan) if rc.raft != nil { rc.raft.Shutdown() } rc.logger.Debug("Raft coordinator stopped") }