From 5e0189bc3c425a28bb269698aa73313eb2517e7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Sun, 17 May 2026 14:31:44 +0000 Subject: [PATCH] Upload files to "internal/cluster" --- internal/cluster/raft_coordinator.go | 888 +++++++++++++++++++++++++++ 1 file changed, 888 insertions(+) create mode 100644 internal/cluster/raft_coordinator.go diff --git a/internal/cluster/raft_coordinator.go b/internal/cluster/raft_coordinator.go new file mode 100644 index 0000000..f0afdb8 --- /dev/null +++ b/internal/cluster/raft_coordinator.go @@ -0,0 +1,888 @@ +/* + * Copyright 2026 Safronov Grigorii + * + * Licensed under the CDDL, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * https://opensource.org/licenses/CDDL-1.0 + */ + +// Файл: internal/cluster/raft_coordinator.go +// Назначение: Реализация координатора распределённого кластера на основе Raft консенсус-алгоритма. +// Обеспечивает управление узлами кластера, выборы лидера, репликацию данных и отказоустойчивость. +// Поддерживает как одноузловой режим работы, так и многокластерную конфигурацию с синхронной/асинхронной репликацией. + + +package cluster + +import ( + "encoding/json" + "fmt" + "io" + "net" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" + + "github.com/hashicorp/raft" + "futriis/internal/log" + "futriis/internal/config" +) + +// InmemStore реализует встроенное файловое хранилище для Raft +// Реализует интерфейсы raft.LogStore и raft.StableStore +type InmemStore struct { + mu sync.RWMutex + data map[string][]byte + path string +} + +// NewInmemStore создаёт новое хранилище +func NewInmemStore(path string) *InmemStore { + store := &InmemStore{ + data: make(map[string][]byte), + path: path, + } + // Пытаемся загрузить существующие данные + store.load() + return store +} + +// load загружает данные из файла +func (s *InmemStore) load() { + if s.path == "" { + return + } + data, err := os.ReadFile(s.path) + if err != nil { + return + } + json.Unmarshal(data, &s.data) +} + +// save сохраняет данные в файл +func (s *InmemStore) save() { + if s.path == "" { + return + } + data, _ := json.Marshal(s.data) + os.WriteFile(s.path, data, 0644) +} + +// ==================== Реализация raft.LogStore ==================== + +// FirstIndex возвращает первый индекс в логе +func (s *InmemStore) FirstIndex() (uint64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var first uint64 = 0 + for key := range s.data { + var idx uint64 + if _, err := fmt.Sscanf(key, "log-%d", &idx); err == nil { + if first == 0 || idx < first { + first = idx + } + } + } + return first, nil +} + +// LastIndex возвращает последний индекс в логе +func (s *InmemStore) LastIndex() (uint64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var last uint64 = 0 + for key := range s.data { + var idx uint64 + if _, err := fmt.Sscanf(key, "log-%d", &idx); err == nil { + if idx > last { + last = idx + } + } + } + return last, nil +} + +// GetLog получает лог по индексу +func (s *InmemStore) GetLog(idx uint64, log *raft.Log) error { + s.mu.RLock() + defer s.mu.RUnlock() + + key := fmt.Sprintf("log-%d", idx) + data, ok := s.data[key] + if !ok { + return raft.ErrLogNotFound + } + + return json.Unmarshal(data, log) +} + +// StoreLog сохраняет один лог +func (s *InmemStore) StoreLog(log *raft.Log) error { + return s.StoreLogs([]*raft.Log{log}) +} + +// StoreLogs сохраняет несколько логов +func (s *InmemStore) StoreLogs(logs []*raft.Log) error { + s.mu.Lock() + defer s.mu.Unlock() + + for _, log := range logs { + key := fmt.Sprintf("log-%d", log.Index) + data, err := json.Marshal(log) + if err != nil { + return err + } + s.data[key] = data + } + s.save() + return nil +} + +// DeleteRange удаляет диапазон логов +func (s *InmemStore) DeleteRange(min, max uint64) error { + s.mu.Lock() + defer s.mu.Unlock() + + for idx := min; idx <= max; idx++ { + key := fmt.Sprintf("log-%d", idx) + delete(s.data, key) + } + s.save() + return nil +} + +// ==================== Реализация raft.StableStore ==================== + +// Get реализует raft.StableStore +func (s *InmemStore) Get(key []byte) ([]byte, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + val, ok := s.data[string(key)] + if !ok { + return nil, nil + } + return val, nil +} + +// Set реализует raft.StableStore +func (s *InmemStore) Set(key []byte, val []byte) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.data[string(key)] = val + s.save() + return nil +} + +// SetUint64 реализует raft.StableStore +func (s *InmemStore) SetUint64(key []byte, val uint64) error { + return s.Set(key, []byte(fmt.Sprintf("%d", val))) +} + +// GetUint64 реализует raft.StableStore +func (s *InmemStore) GetUint64(key []byte) (uint64, error) { + val, err := s.Get(key) + if err != nil { + return 0, err + } + if val == nil { + return 0, nil + } + var result uint64 + fmt.Sscanf(string(val), "%d", &result) + return result, nil +} + +// RaftClusterState представляет состояние кластера для Raft FSM +type RaftClusterState struct { + Nodes map[string]*NodeInfo `json:"nodes"` + ReplicationFactor int32 `json:"replication_factor"` + mu sync.RWMutex +} + +// RaftCoordinator реализует координацию кластера через Raft +type RaftCoordinator struct { + raft *raft.Raft + fsm *RaftFSM + address string + raftAddr string + clusterName string + logger *log.Logger + config *config.Config + stopChan chan struct{} + nodes sync.Map + replicationFactor atomic.Int32 + replicationEnabled bool + masterMasterEnabled bool + syncReplication bool + isLeader atomic.Bool + leaderMonitor chan bool + singleNodeMode bool + localNodeInfo *NodeInfo + logStore *InmemStore + stableStore *InmemStore +} + +// RaftFSM реализует конечный автомат для Raft +type RaftFSM struct { + state *RaftClusterState + logger *log.Logger +} + +// NodeRegistrationCommand команда регистрации узла +type NodeRegistrationCommand struct { + Type string `json:"type"` + Node NodeInfo `json:"node,omitempty"` + NodeID string `json:"node_id,omitempty"` + Factor int32 `json:"factor,omitempty"` +} + +// getLocalIP получает локальный IP адрес +func getLocalIP() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "127.0.0.1" + } + for _, addr := range addrs { + if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && ipnet.IP.To4() != nil { + return ipnet.IP.String() + } + } + return "127.0.0.1" +} + +// NewRaftCoordinator создаёт новый Raft координатор +func NewRaftCoordinator(cfg *config.Config, logger *log.Logger) (*RaftCoordinator, error) { + nodeIP := cfg.Cluster.NodeIP + if nodeIP == "" || nodeIP == "0.0.0.0" { + nodeIP = getLocalIP() + } + + raftAddr := fmt.Sprintf("%s:%d", nodeIP, cfg.Cluster.RaftPort) + + logger.Debug(fmt.Sprintf("Creating Raft coordinator at %s", raftAddr)) + + singleNodeMode := len(cfg.Cluster.Nodes) <= 1 || cfg.Cluster.Bootstrap + + rc := &RaftCoordinator{ + address: fmt.Sprintf("%s:%d", nodeIP, cfg.Cluster.NodePort), + raftAddr: raftAddr, + clusterName: cfg.Cluster.Name, + logger: logger, + config: cfg, + stopChan: make(chan struct{}), + leaderMonitor: make(chan bool, 1), + replicationEnabled: cfg.Replication.Enabled, + masterMasterEnabled: cfg.Replication.MasterMaster, + syncReplication: cfg.Replication.SyncReplication, + singleNodeMode: singleNodeMode, + localNodeInfo: &NodeInfo{ + ID: fmt.Sprintf("%s-%s", cfg.Cluster.Name, nodeIP), + IP: nodeIP, + Port: cfg.Cluster.NodePort, + Status: "active", + LastSeen: time.Now().Unix(), + }, + } + rc.replicationFactor.Store(int32(3)) + + rc.fsm = &RaftFSM{ + state: &RaftClusterState{ + Nodes: make(map[string]*NodeInfo), + }, + logger: logger, + } + + if singleNodeMode { + rc.fsm.state.mu.Lock() + rc.fsm.state.Nodes[rc.localNodeInfo.ID] = rc.localNodeInfo + rc.fsm.state.mu.Unlock() + rc.isLeader.Store(true) + logger.Debug(fmt.Sprintf("Single-node mode: local node added to state: %s", rc.localNodeInfo.ID)) + } + + raftConfig := raft.DefaultConfig() + raftConfig.LocalID = raft.ServerID(rc.localNodeInfo.ID) + raftConfig.HeartbeatTimeout = 1000 * time.Millisecond + raftConfig.ElectionTimeout = 1000 * time.Millisecond + raftConfig.CommitTimeout = 500 * time.Millisecond + raftConfig.LeaderLeaseTimeout = 500 * time.Millisecond + + if singleNodeMode { + raftConfig.HeartbeatTimeout = 500 * time.Millisecond + raftConfig.ElectionTimeout = 500 * time.Millisecond + raftConfig.LeaderLeaseTimeout = 500 * time.Millisecond + raftConfig.LogOutput = io.Discard + logger.Debug("Running in single-node mode (warnings suppressed)") + } else { + raftConfig.LogOutput = os.Stderr + } + + dataDir := cfg.Cluster.RaftDataDir + if err := os.MkdirAll(dataDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create raft data dir: %v", err) + } + + logger.Debug(fmt.Sprintf("Raft data directory: %s", dataDir)) + + // Используем встроенные хранилища вместо boltDB + rc.logStore = NewInmemStore(filepath.Join(dataDir, "raft-log.json")) + rc.stableStore = NewInmemStore(filepath.Join(dataDir, "raft-stable.json")) + + // Создаём снапшот хранилище + snapshotStore, err := raft.NewFileSnapshotStore(dataDir, 2, os.Stderr) + if err != nil { + return nil, fmt.Errorf("failed to create snapshot store: %v", err) + } + + // Создаём транспорт + transport, err := raft.NewTCPTransport(raftAddr, nil, 3, 10*time.Second, os.Stderr) + if err != nil { + return nil, fmt.Errorf("failed to create transport: %v", err) + } + + // Создаём Raft инстанс - используем оба хранилища + r, err := raft.NewRaft(raftConfig, rc.fsm, rc.logStore, rc.stableStore, snapshotStore, transport) + if err != nil { + return nil, fmt.Errorf("failed to create raft: %v", err) + } + + rc.raft = r + + time.Sleep(500 * time.Millisecond) + + bootstrapPath := filepath.Join(dataDir, "raft-log.json") + _, statErr := os.Stat(bootstrapPath) + needsBootstrap := os.IsNotExist(statErr) + + if needsBootstrap && singleNodeMode { + logger.Debug("Bootstrapping single-node cluster...") + + configuration := raft.Configuration{ + Servers: []raft.Server{ + { + ID: raftConfig.LocalID, + Address: transport.LocalAddr(), + }, + }, + } + + future := r.BootstrapCluster(configuration) + if err := future.Error(); err != nil { + logger.Warn(fmt.Sprintf("Bootstrap error: %v", err)) + } else { + logger.Debug("Single-node cluster bootstrapped successfully") + } + + time.Sleep(1 * time.Second) + + } else if needsBootstrap && len(cfg.Cluster.Nodes) > 1 { + logger.Debug("Bootstrapping multi-node cluster...") + + servers := make([]raft.Server, 0, len(cfg.Cluster.Nodes)) + for i, nodeAddr := range cfg.Cluster.Nodes { + serverID := raft.ServerID(fmt.Sprintf("%s-node%d", rc.clusterName, i+1)) + servers = append(servers, raft.Server{ + ID: serverID, + Address: raft.ServerAddress(nodeAddr), + }) + } + + configuration := raft.Configuration{ + Servers: servers, + } + + future := r.BootstrapCluster(configuration) + if err := future.Error(); err != nil { + logger.Warn(fmt.Sprintf("Bootstrap error: %v", err)) + } else { + logger.Debug("Multi-node cluster bootstrapped successfully") + } + + go rc.monitorLeadership() + + logger.Debug("Waiting for leader election...") + timeout := time.After(5 * time.Second) + leaderElected := false + + for !leaderElected { + select { + case isLeader := <-rc.leaderMonitor: + if isLeader { + leaderElected = true + rc.isLeader.Store(true) + logger.Debug("This node is now the cluster leader") + } + case <-timeout: + logger.Warn("Leader election timeout") + leaderElected = true + } + } + } else { + logger.Debug("Existing Raft state found, joining cluster...") + go rc.monitorLeadership() + + if !singleNodeMode { + time.Sleep(1 * time.Second) + if r.State() == raft.Leader { + rc.isLeader.Store(true) + logger.Debug("This node is the cluster leader") + } + } + } + + logger.Debug(fmt.Sprintf("Raft coordinator started at %s, IsLeader: %v, SingleNodeMode: %v", raftAddr, rc.isLeader.Load(), singleNodeMode)) + + return rc, nil +} + +// monitorLeadership отслеживает изменения лидера +func (rc *RaftCoordinator) monitorLeadership() { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + wasLeader := false + + for { + select { + case <-rc.stopChan: + return + case <-ticker.C: + if rc.raft == nil { + continue + } + isLeader := rc.raft.State() == raft.Leader + if isLeader != wasLeader { + wasLeader = isLeader + select { + case rc.leaderMonitor <- isLeader: + default: + } + if isLeader { + rc.isLeader.Store(true) + rc.logger.Debug("Leadership acquired") + } else { + rc.isLeader.Store(false) + rc.logger.Debug("Leadership lost") + } + } + } + } +} + +// Apply применяет команду к FSM +func (f *RaftFSM) Apply(log *raft.Log) interface{} { + var cmd NodeRegistrationCommand + if err := json.Unmarshal(log.Data, &cmd); err != nil { + f.logger.Error(fmt.Sprintf("Failed to unmarshal raft command: %v", err)) + return err + } + + f.state.mu.Lock() + defer f.state.mu.Unlock() + + switch cmd.Type { + case "register": + f.state.Nodes[cmd.Node.ID] = &cmd.Node + f.logger.Debug(fmt.Sprintf("Raft: Node registered: %s", cmd.Node.ID)) + case "remove": + delete(f.state.Nodes, cmd.NodeID) + f.logger.Debug(fmt.Sprintf("Raft: Node removed: %s", cmd.NodeID)) + case "set_replication_factor": + f.state.ReplicationFactor = cmd.Factor + f.logger.Debug(fmt.Sprintf("Raft: Replication factor set to %d", cmd.Factor)) + } + + return nil +} + +// Snapshot реализует создание снапшота +func (f *RaftFSM) Snapshot() (raft.FSMSnapshot, error) { + f.state.mu.RLock() + defer f.state.mu.RUnlock() + + stateCopy := &RaftClusterState{ + Nodes: make(map[string]*NodeInfo), + ReplicationFactor: f.state.ReplicationFactor, + } + for k, v := range f.state.Nodes { + stateCopy.Nodes[k] = v + } + + return &RaftSnapshot{state: stateCopy}, nil +} + +// Restore восстанавливает состояние из снапшота +func (f *RaftFSM) Restore(snapshot io.ReadCloser) error { + defer snapshot.Close() + + var state RaftClusterState + decoder := json.NewDecoder(snapshot) + if err := decoder.Decode(&state); err != nil { + return err + } + + f.state.mu.Lock() + defer f.state.mu.Unlock() + f.state.Nodes = state.Nodes + f.state.ReplicationFactor = state.ReplicationFactor + + return nil +} + +// RaftSnapshot реализует интерфейс FSMSnapshot +type RaftSnapshot struct { + state *RaftClusterState +} + +// Persist сохраняет снапшот +func (s *RaftSnapshot) Persist(sink raft.SnapshotSink) error { + err := func() error { + data, err := json.Marshal(s.state) + if err != nil { + return err + } + + if _, err := sink.Write(data); err != nil { + return err + } + + return sink.Close() + }() + + if err != nil { + sink.Cancel() + return err + } + + return nil +} + +// Release освобождает ресурсы +func (s *RaftSnapshot) Release() {} + +// RegisterNode регистрирует узел через Raft +func (rc *RaftCoordinator) RegisterNode(node *Node) error { + nodeInfo := &NodeInfo{ + ID: node.ID, + IP: node.IP, + Port: node.Port, + Status: "active", + LastSeen: time.Now().Unix(), + } + + if rc.singleNodeMode { + rc.logger.Debug("Single-node mode: registering node without Raft consensus") + + rc.nodes.Store(node.ID, nodeInfo) + + rc.fsm.state.mu.Lock() + rc.fsm.state.Nodes[node.ID] = nodeInfo + rc.fsm.state.mu.Unlock() + + rc.logger.Debug(fmt.Sprintf("Node registered locally in single-node mode: %s", node.ID)) + return nil + } + + if !rc.IsLeader() { + leader := rc.GetLeader() + if leader != nil { + rc.logger.Warn(fmt.Sprintf("Current node is not leader. Leader is %s:%d", leader.IP, leader.Port)) + return fmt.Errorf("node is not the leader. Please connect to leader at %s:%d", leader.IP, leader.Port) + } + return fmt.Errorf("node is not the leader and no leader found") + } + + cmd := NodeRegistrationCommand{ + Type: "register", + Node: *nodeInfo, + } + + data, err := json.Marshal(cmd) + if err != nil { + return err + } + + future := rc.raft.Apply(data, 5*time.Second) + if err := future.Error(); err != nil { + return fmt.Errorf("failed to register node via raft: %v", err) + } + + rc.nodes.Store(node.ID, nodeInfo) + + rc.logger.Debug(fmt.Sprintf("Node registered via Raft: %s", node.ID)) + return nil +} + +// RemoveNode удаляет узел через Raft +func (rc *RaftCoordinator) RemoveNode(nodeID string) error { + if rc.singleNodeMode { + rc.nodes.Delete(nodeID) + rc.fsm.state.mu.Lock() + delete(rc.fsm.state.Nodes, nodeID) + rc.fsm.state.mu.Unlock() + rc.logger.Debug(fmt.Sprintf("Node removed locally in single-node mode: %s", nodeID)) + return nil + } + + if !rc.IsLeader() { + return fmt.Errorf("node is not the leader") + } + + cmd := NodeRegistrationCommand{ + Type: "remove", + NodeID: nodeID, + } + + data, err := json.Marshal(cmd) + if err != nil { + return err + } + + future := rc.raft.Apply(data, 5*time.Second) + if err := future.Error(); err != nil { + return fmt.Errorf("failed to remove node via raft: %v", err) + } + + rc.nodes.Delete(nodeID) + rc.logger.Debug(fmt.Sprintf("Node removed via Raft: %s", nodeID)) + return nil +} + +// GetActiveNodes возвращает активные узлы +func (rc *RaftCoordinator) GetActiveNodes() []*NodeInfo { + nodes := make([]*NodeInfo, 0) + now := time.Now().Unix() + + state := rc.fsm.state + state.mu.RLock() + defer state.mu.RUnlock() + + for _, nodeInfo := range state.Nodes { + if now-nodeInfo.LastSeen < 30 { + nodes = append(nodes, nodeInfo) + } + } + + if rc.singleNodeMode && len(nodes) == 0 && rc.localNodeInfo != nil { + nodes = append(nodes, rc.localNodeInfo) + } + + return nodes +} + +// GetAllNodes возвращает все узлы +func (rc *RaftCoordinator) GetAllNodes() []*NodeInfo { + state := rc.fsm.state + state.mu.RLock() + defer state.mu.RUnlock() + + nodes := make([]*NodeInfo, 0, len(state.Nodes)) + for _, node := range state.Nodes { + nodes = append(nodes, node) + } + + if rc.singleNodeMode && len(nodes) == 0 && rc.localNodeInfo != nil { + nodes = append(nodes, rc.localNodeInfo) + } + + return nodes +} + +// GetLeader возвращает лидера +func (rc *RaftCoordinator) GetLeader() *NodeInfo { + if rc.singleNodeMode { + return rc.localNodeInfo + } + + leaderAddr := rc.raft.Leader() + if leaderAddr == "" { + return nil + } + + state := rc.fsm.state + state.mu.RLock() + defer state.mu.RUnlock() + + for _, node := range state.Nodes { + nodeAddr := fmt.Sprintf("%s:%d", node.IP, node.Port) + if nodeAddr == string(leaderAddr) { + return node + } + } + return nil +} + +// IsLeader проверяет, является ли текущий узел лидером +func (rc *RaftCoordinator) IsLeader() bool { + if rc.singleNodeMode { + return true + } + return rc.isLeader.Load() +} + +// SendHeartbeat обновляет heartbeat узла +func (rc *RaftCoordinator) SendHeartbeat(nodeID string) { + if val, ok := rc.nodes.Load(nodeID); ok { + nodeInfo := val.(*NodeInfo) + nodeInfo.LastSeen = time.Now().Unix() + rc.nodes.Store(nodeID, nodeInfo) + } + + rc.fsm.state.mu.Lock() + if nodeInfo, ok := rc.fsm.state.Nodes[nodeID]; ok { + nodeInfo.LastSeen = time.Now().Unix() + } + rc.fsm.state.mu.Unlock() +} + +// GetClusterStatus возвращает статус кластера +func (rc *RaftCoordinator) GetClusterStatus() *ClusterStatus { + nodes := rc.GetAllNodes() + activeNodes := rc.GetActiveNodes() + + syncingNodes := 0 + for _, node := range nodes { + if node.Status == "syncing" { + syncingNodes++ + } + } + + leader := rc.GetLeader() + leaderID := "" + if leader != nil { + leaderID = leader.ID + } + + return &ClusterStatus{ + Name: rc.clusterName, + TotalNodes: len(nodes), + ActiveNodes: len(activeNodes), + SyncingNodes: syncingNodes, + FailedNodes: len(nodes) - len(activeNodes), + ReplicationFactor: int(rc.replicationFactor.Load()), + LeaderID: leaderID, + Health: rc.calculateHealth(), + } +} + +// calculateHealth вычисляет здоровье кластера +func (rc *RaftCoordinator) calculateHealth() string { + activeNodes := rc.GetActiveNodes() + totalNodes := rc.GetAllNodes() + + if len(totalNodes) == 0 { + return "critical" + } + + ratio := float64(len(activeNodes)) / float64(len(totalNodes)) + if ratio >= 0.8 { + return "healthy" + } else if ratio >= 0.5 { + return "degraded" + } + return "critical" +} + +// GetReplicationFactor возвращает фактор репликации +func (rc *RaftCoordinator) GetReplicationFactor() int { + return int(rc.replicationFactor.Load()) +} + +// SetReplicationFactor устанавливает фактор репликации через Raft +func (rc *RaftCoordinator) SetReplicationFactor(factor int) error { + if !rc.IsLeader() { + return fmt.Errorf("node is not the leader") + } + + cmd := NodeRegistrationCommand{ + Type: "set_replication_factor", + Factor: int32(factor), + } + + data, err := json.Marshal(cmd) + if err != nil { + return err + } + + future := rc.raft.Apply(data, 5*time.Second) + if err := future.Error(); err != nil { + return fmt.Errorf("failed to set replication factor via raft: %v", err) + } + + rc.replicationFactor.Store(int32(factor)) + rc.logger.Debug(fmt.Sprintf("Replication factor set to %d via Raft", factor)) + return nil +} + +// GetClusterHealth возвращает детальную информацию о здоровье кластера +func (rc *RaftCoordinator) GetClusterHealth() *ClusterHealth { + health := &ClusterHealth{ + Nodes: make(map[string]*NodeHealth), + OverallScore: 100.0, + Recommendations: "", + } + + now := time.Now().Unix() + state := rc.fsm.state + state.mu.RLock() + defer state.mu.RUnlock() + + for nodeID, nodeInfo := range state.Nodes { + nodeHealth := &NodeHealth{ + Status: nodeInfo.Status, + LatencyMs: 0, + LastCheck: now, + } + + if now-nodeInfo.LastSeen > 30 { + nodeHealth.Status = "offline" + health.OverallScore -= 10 + } else if nodeInfo.Status == "syncing" { + health.OverallScore -= 5 + } + + health.Nodes[nodeID] = nodeHealth + } + + if health.OverallScore < 50 { + health.Recommendations = "Critical: Check network connectivity and node health immediately" + } else if health.OverallScore < 80 { + health.Recommendations = "Warning: Some nodes are offline or syncing, consider adding more nodes" + } else { + health.Recommendations = "Cluster is healthy, all systems operational" + } + + return health +} + +// IsReplicationEnabled возвращает статус репликации +func (rc *RaftCoordinator) IsReplicationEnabled() bool { + return rc.replicationEnabled +} + +// IsMasterMasterEnabled возвращает статус мастер-мастер репликации +func (rc *RaftCoordinator) IsMasterMasterEnabled() bool { + return rc.masterMasterEnabled +} + +// IsSyncReplicationEnabled возвращает статус синхронной репликации +func (rc *RaftCoordinator) IsSyncReplicationEnabled() bool { + return rc.syncReplication +} + +// Stop останавливает координатор +func (rc *RaftCoordinator) Stop() { + close(rc.stopChan) + if rc.raft != nil { + rc.raft.Shutdown() + } + rc.logger.Debug("Raft coordinator stopped") +}