Delete internal/cluster/raft_coordinator.go
This commit is contained in:
@@ -1,750 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2026 Safronov Grigorii
|
|
||||||
*
|
|
||||||
* Licensed under the CDDL, Version 1.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
*
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
* https://opensource.org/licenses/CDDL-1.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
// Файл: internal/cluster/raft_coordinator.go
|
|
||||||
// Назначение: Реализация координатора распределённого кластера на основе Raft консенсус-алгоритма.
|
|
||||||
// Обеспечивает управление узлами кластера, выборы лидера, репликацию данных и отказоустойчивость.
|
|
||||||
// Поддерживает как одноузловой режим работы, так и многокластерную конфигурацию с синхронной/асинхронной репликацией.
|
|
||||||
|
|
||||||
package cluster
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/hashicorp/raft"
|
|
||||||
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
|
|
||||||
"futriis/internal/log"
|
|
||||||
"futriis/internal/config"
|
|
||||||
)
|
|
||||||
|
|
||||||
// RaftClusterState представляет состояние кластера для Raft FSM
|
|
||||||
type RaftClusterState struct {
|
|
||||||
Nodes map[string]*NodeInfo `json:"nodes"`
|
|
||||||
ReplicationFactor int32 `json:"replication_factor"`
|
|
||||||
mu sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// RaftCoordinator реализует координацию кластера через Raft
|
|
||||||
type RaftCoordinator struct {
|
|
||||||
raft *raft.Raft
|
|
||||||
fsm *RaftFSM
|
|
||||||
address string
|
|
||||||
raftAddr string
|
|
||||||
clusterName string
|
|
||||||
logger *log.Logger
|
|
||||||
config *config.Config
|
|
||||||
stopChan chan struct{}
|
|
||||||
nodes sync.Map
|
|
||||||
replicationFactor atomic.Int32
|
|
||||||
replicationEnabled bool
|
|
||||||
masterMasterEnabled bool
|
|
||||||
syncReplication bool
|
|
||||||
isLeader atomic.Bool
|
|
||||||
leaderMonitor chan bool
|
|
||||||
singleNodeMode bool
|
|
||||||
localNodeInfo *NodeInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
// RaftFSM реализует конечный автомат для Raft
|
|
||||||
type RaftFSM struct {
|
|
||||||
state *RaftClusterState
|
|
||||||
logger *log.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
// NodeRegistrationCommand команда регистрации узла
|
|
||||||
type NodeRegistrationCommand struct {
|
|
||||||
Type string `json:"type"`
|
|
||||||
Node NodeInfo `json:"node,omitempty"`
|
|
||||||
NodeID string `json:"node_id,omitempty"`
|
|
||||||
Factor int32 `json:"factor,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// getLocalIP получает локальный IP адрес
|
|
||||||
func getLocalIP() string {
|
|
||||||
addrs, err := net.InterfaceAddrs()
|
|
||||||
if err != nil {
|
|
||||||
return "127.0.0.1"
|
|
||||||
}
|
|
||||||
for _, addr := range addrs {
|
|
||||||
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && ipnet.IP.To4() != nil {
|
|
||||||
return ipnet.IP.String()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return "127.0.0.1"
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewRaftCoordinator создаёт новый Raft координатор
|
|
||||||
func NewRaftCoordinator(cfg *config.Config, logger *log.Logger) (*RaftCoordinator, error) {
|
|
||||||
// Используем IP из конфига или автоматически определяем
|
|
||||||
nodeIP := cfg.Cluster.NodeIP
|
|
||||||
if nodeIP == "" || nodeIP == "0.0.0.0" {
|
|
||||||
nodeIP = getLocalIP()
|
|
||||||
}
|
|
||||||
|
|
||||||
raftAddr := fmt.Sprintf("%s:%d", nodeIP, cfg.Cluster.RaftPort)
|
|
||||||
|
|
||||||
logger.Debug(fmt.Sprintf("Creating Raft coordinator at %s", raftAddr))
|
|
||||||
|
|
||||||
// Определяем одноузловой режим
|
|
||||||
singleNodeMode := len(cfg.Cluster.Nodes) <= 1 || cfg.Cluster.Bootstrap
|
|
||||||
|
|
||||||
rc := &RaftCoordinator{
|
|
||||||
address: fmt.Sprintf("%s:%d", nodeIP, cfg.Cluster.NodePort),
|
|
||||||
raftAddr: raftAddr,
|
|
||||||
clusterName: cfg.Cluster.Name,
|
|
||||||
logger: logger,
|
|
||||||
config: cfg,
|
|
||||||
stopChan: make(chan struct{}),
|
|
||||||
leaderMonitor: make(chan bool, 1),
|
|
||||||
replicationEnabled: cfg.Replication.Enabled,
|
|
||||||
masterMasterEnabled: cfg.Replication.MasterMaster,
|
|
||||||
syncReplication: cfg.Replication.SyncReplication,
|
|
||||||
singleNodeMode: singleNodeMode,
|
|
||||||
localNodeInfo: &NodeInfo{
|
|
||||||
ID: fmt.Sprintf("%s-%s", cfg.Cluster.Name, nodeIP),
|
|
||||||
IP: nodeIP,
|
|
||||||
Port: cfg.Cluster.NodePort,
|
|
||||||
Status: "active",
|
|
||||||
LastSeen: time.Now().Unix(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
rc.replicationFactor.Store(int32(3))
|
|
||||||
|
|
||||||
// Создаём FSM
|
|
||||||
rc.fsm = &RaftFSM{
|
|
||||||
state: &RaftClusterState{
|
|
||||||
Nodes: make(map[string]*NodeInfo),
|
|
||||||
},
|
|
||||||
logger: logger,
|
|
||||||
}
|
|
||||||
|
|
||||||
// В одноузловом режиме добавляем локальный узел в состояние
|
|
||||||
if singleNodeMode {
|
|
||||||
rc.fsm.state.mu.Lock()
|
|
||||||
rc.fsm.state.Nodes[rc.localNodeInfo.ID] = rc.localNodeInfo
|
|
||||||
rc.fsm.state.mu.Unlock()
|
|
||||||
rc.isLeader.Store(true)
|
|
||||||
logger.Debug(fmt.Sprintf("Single-node mode: local node added to state: %s", rc.localNodeInfo.ID))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Настраиваем Raft
|
|
||||||
raftConfig := raft.DefaultConfig()
|
|
||||||
raftConfig.LocalID = raft.ServerID(rc.localNodeInfo.ID)
|
|
||||||
raftConfig.HeartbeatTimeout = 1000 * time.Millisecond
|
|
||||||
raftConfig.ElectionTimeout = 1000 * time.Millisecond
|
|
||||||
raftConfig.CommitTimeout = 500 * time.Millisecond
|
|
||||||
raftConfig.LeaderLeaseTimeout = 500 * time.Millisecond
|
|
||||||
|
|
||||||
// Для одноузлового кластера используем специальные настройки и подавляем предупреждения
|
|
||||||
if singleNodeMode {
|
|
||||||
raftConfig.HeartbeatTimeout = 500 * time.Millisecond
|
|
||||||
raftConfig.ElectionTimeout = 500 * time.Millisecond
|
|
||||||
raftConfig.LeaderLeaseTimeout = 500 * time.Millisecond
|
|
||||||
// Подавляем вывод предупреждений для одноузлового режима
|
|
||||||
raftConfig.LogOutput = io.Discard
|
|
||||||
logger.Debug("Running in single-node mode (warnings suppressed)")
|
|
||||||
} else {
|
|
||||||
raftConfig.LogOutput = os.Stderr
|
|
||||||
}
|
|
||||||
|
|
||||||
// Создаём директорию для Raft данных
|
|
||||||
dataDir := cfg.Cluster.RaftDataDir
|
|
||||||
if err := os.MkdirAll(dataDir, 0755); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create raft data dir: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Debug(fmt.Sprintf("Raft data directory: %s", dataDir))
|
|
||||||
|
|
||||||
// Создаём хранилище для логов
|
|
||||||
logStore, err := raftboltdb.NewBoltStore(filepath.Join(dataDir, "raft-log.bolt"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create log store: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Создаём хранилище для стабильных данных
|
|
||||||
stableStore, err := raftboltdb.NewBoltStore(filepath.Join(dataDir, "raft-stable.bolt"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create stable store: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Создаём снапшот хранилище
|
|
||||||
snapshotStore, err := raft.NewFileSnapshotStore(dataDir, 2, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create snapshot store: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Создаём транспорт
|
|
||||||
transport, err := raft.NewTCPTransport(raftAddr, nil, 3, 10*time.Second, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create transport: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Создаём Raft инстанс
|
|
||||||
r, err := raft.NewRaft(raftConfig, rc.fsm, logStore, stableStore, snapshotStore, transport)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create raft: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
rc.raft = r
|
|
||||||
|
|
||||||
// Ждём некоторое время для инициализации Raft
|
|
||||||
time.Sleep(500 * time.Millisecond)
|
|
||||||
|
|
||||||
// Проверяем, нужно ли делать bootstrap
|
|
||||||
bootstrapPath := filepath.Join(dataDir, "raft-log.bolt")
|
|
||||||
_, statErr := os.Stat(bootstrapPath)
|
|
||||||
needsBootstrap := os.IsNotExist(statErr)
|
|
||||||
|
|
||||||
if needsBootstrap && singleNodeMode {
|
|
||||||
logger.Debug("Bootstrapping single-node cluster...")
|
|
||||||
|
|
||||||
configuration := raft.Configuration{
|
|
||||||
Servers: []raft.Server{
|
|
||||||
{
|
|
||||||
ID: raftConfig.LocalID,
|
|
||||||
Address: transport.LocalAddr(),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
future := r.BootstrapCluster(configuration)
|
|
||||||
if err := future.Error(); err != nil {
|
|
||||||
logger.Warn(fmt.Sprintf("Bootstrap error: %v", err))
|
|
||||||
} else {
|
|
||||||
logger.Debug("Single-node cluster bootstrapped successfully")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ждём после bootstrap
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
} else if needsBootstrap && len(cfg.Cluster.Nodes) > 1 {
|
|
||||||
logger.Debug("Bootstrapping multi-node cluster...")
|
|
||||||
|
|
||||||
servers := make([]raft.Server, 0, len(cfg.Cluster.Nodes))
|
|
||||||
for i, nodeAddr := range cfg.Cluster.Nodes {
|
|
||||||
serverID := raft.ServerID(fmt.Sprintf("%s-node%d", rc.clusterName, i+1))
|
|
||||||
servers = append(servers, raft.Server{
|
|
||||||
ID: serverID,
|
|
||||||
Address: raft.ServerAddress(nodeAddr),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
configuration := raft.Configuration{
|
|
||||||
Servers: servers,
|
|
||||||
}
|
|
||||||
|
|
||||||
future := r.BootstrapCluster(configuration)
|
|
||||||
if err := future.Error(); err != nil {
|
|
||||||
logger.Warn(fmt.Sprintf("Bootstrap error: %v", err))
|
|
||||||
} else {
|
|
||||||
logger.Debug("Multi-node cluster bootstrapped successfully")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Запускаем мониторинг лидера
|
|
||||||
go rc.monitorLeadership()
|
|
||||||
|
|
||||||
// Ждём выборов лидера
|
|
||||||
logger.Debug("Waiting for leader election...")
|
|
||||||
timeout := time.After(5 * time.Second)
|
|
||||||
leaderElected := false
|
|
||||||
|
|
||||||
for !leaderElected {
|
|
||||||
select {
|
|
||||||
case isLeader := <-rc.leaderMonitor:
|
|
||||||
if isLeader {
|
|
||||||
leaderElected = true
|
|
||||||
rc.isLeader.Store(true)
|
|
||||||
logger.Debug("This node is now the cluster leader")
|
|
||||||
}
|
|
||||||
case <-timeout:
|
|
||||||
logger.Warn("Leader election timeout")
|
|
||||||
leaderElected = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Существующее состояние, просто подключаемся
|
|
||||||
logger.Debug("Existing Raft state found, joining cluster...")
|
|
||||||
go rc.monitorLeadership()
|
|
||||||
|
|
||||||
// В одноузловом режиме не ждём лидера
|
|
||||||
if !singleNodeMode {
|
|
||||||
// Проверяем, не являемся ли мы лидером
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
if r.State() == raft.Leader {
|
|
||||||
rc.isLeader.Store(true)
|
|
||||||
logger.Debug("This node is the cluster leader")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Debug(fmt.Sprintf("Raft coordinator started at %s, IsLeader: %v, SingleNodeMode: %v", raftAddr, rc.isLeader.Load(), singleNodeMode))
|
|
||||||
|
|
||||||
return rc, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// monitorLeadership отслеживает изменения лидера
|
|
||||||
func (rc *RaftCoordinator) monitorLeadership() {
|
|
||||||
ticker := time.NewTicker(500 * time.Millisecond)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
wasLeader := false
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-rc.stopChan:
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
if rc.raft == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
isLeader := rc.raft.State() == raft.Leader
|
|
||||||
if isLeader != wasLeader {
|
|
||||||
wasLeader = isLeader
|
|
||||||
select {
|
|
||||||
case rc.leaderMonitor <- isLeader:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
if isLeader {
|
|
||||||
rc.isLeader.Store(true)
|
|
||||||
rc.logger.Debug("Leadership acquired")
|
|
||||||
} else {
|
|
||||||
rc.isLeader.Store(false)
|
|
||||||
rc.logger.Debug("Leadership lost")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply применяет команду к FSM
|
|
||||||
func (f *RaftFSM) Apply(log *raft.Log) interface{} {
|
|
||||||
var cmd NodeRegistrationCommand
|
|
||||||
if err := json.Unmarshal(log.Data, &cmd); err != nil {
|
|
||||||
f.logger.Error(fmt.Sprintf("Failed to unmarshal raft command: %v", err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
f.state.mu.Lock()
|
|
||||||
defer f.state.mu.Unlock()
|
|
||||||
|
|
||||||
switch cmd.Type {
|
|
||||||
case "register":
|
|
||||||
f.state.Nodes[cmd.Node.ID] = &cmd.Node
|
|
||||||
f.logger.Debug(fmt.Sprintf("Raft: Node registered: %s", cmd.Node.ID))
|
|
||||||
case "remove":
|
|
||||||
delete(f.state.Nodes, cmd.NodeID)
|
|
||||||
f.logger.Debug(fmt.Sprintf("Raft: Node removed: %s", cmd.NodeID))
|
|
||||||
case "set_replication_factor":
|
|
||||||
f.state.ReplicationFactor = cmd.Factor
|
|
||||||
f.logger.Debug(fmt.Sprintf("Raft: Replication factor set to %d", cmd.Factor))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Snapshot реализует создание снапшота
|
|
||||||
func (f *RaftFSM) Snapshot() (raft.FSMSnapshot, error) {
|
|
||||||
f.state.mu.RLock()
|
|
||||||
defer f.state.mu.RUnlock()
|
|
||||||
|
|
||||||
stateCopy := &RaftClusterState{
|
|
||||||
Nodes: make(map[string]*NodeInfo),
|
|
||||||
ReplicationFactor: f.state.ReplicationFactor,
|
|
||||||
}
|
|
||||||
for k, v := range f.state.Nodes {
|
|
||||||
stateCopy.Nodes[k] = v
|
|
||||||
}
|
|
||||||
|
|
||||||
return &RaftSnapshot{state: stateCopy}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Restore восстанавливает состояние из снапшота
|
|
||||||
func (f *RaftFSM) Restore(snapshot io.ReadCloser) error {
|
|
||||||
defer snapshot.Close()
|
|
||||||
|
|
||||||
var state RaftClusterState
|
|
||||||
decoder := json.NewDecoder(snapshot)
|
|
||||||
if err := decoder.Decode(&state); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
f.state.mu.Lock()
|
|
||||||
defer f.state.mu.Unlock()
|
|
||||||
f.state.Nodes = state.Nodes
|
|
||||||
f.state.ReplicationFactor = state.ReplicationFactor
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RaftSnapshot реализует интерфейс FSMSnapshot
|
|
||||||
type RaftSnapshot struct {
|
|
||||||
state *RaftClusterState
|
|
||||||
}
|
|
||||||
|
|
||||||
// Persist сохраняет снапшот
|
|
||||||
func (s *RaftSnapshot) Persist(sink raft.SnapshotSink) error {
|
|
||||||
err := func() error {
|
|
||||||
data, err := json.Marshal(s.state)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := sink.Write(data); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return sink.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
sink.Cancel()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Release освобождает ресурсы
|
|
||||||
func (s *RaftSnapshot) Release() {}
|
|
||||||
|
|
||||||
// RegisterNode регистрирует узел через Raft
|
|
||||||
func (rc *RaftCoordinator) RegisterNode(node *Node) error {
|
|
||||||
nodeInfo := &NodeInfo{
|
|
||||||
ID: node.ID,
|
|
||||||
IP: node.IP,
|
|
||||||
Port: node.Port,
|
|
||||||
Status: "active",
|
|
||||||
LastSeen: time.Now().Unix(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// В одноузловом режиме всегда считаем себя лидером
|
|
||||||
if rc.singleNodeMode {
|
|
||||||
rc.logger.Debug("Single-node mode: registering node without Raft consensus")
|
|
||||||
|
|
||||||
// Просто сохраняем узел локально
|
|
||||||
rc.nodes.Store(node.ID, nodeInfo)
|
|
||||||
|
|
||||||
// Также сохраняем в FSM
|
|
||||||
rc.fsm.state.mu.Lock()
|
|
||||||
rc.fsm.state.Nodes[node.ID] = nodeInfo
|
|
||||||
rc.fsm.state.mu.Unlock()
|
|
||||||
|
|
||||||
rc.logger.Debug(fmt.Sprintf("Node registered locally in single-node mode: %s", node.ID))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Проверяем, является ли текущий узел лидером
|
|
||||||
if !rc.IsLeader() {
|
|
||||||
leader := rc.GetLeader()
|
|
||||||
if leader != nil {
|
|
||||||
rc.logger.Warn(fmt.Sprintf("Current node is not leader. Leader is %s:%d", leader.IP, leader.Port))
|
|
||||||
return fmt.Errorf("node is not the leader. Please connect to leader at %s:%d", leader.IP, leader.Port)
|
|
||||||
}
|
|
||||||
return fmt.Errorf("node is not the leader and no leader found")
|
|
||||||
}
|
|
||||||
|
|
||||||
cmd := NodeRegistrationCommand{
|
|
||||||
Type: "register",
|
|
||||||
Node: *nodeInfo,
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := json.Marshal(cmd)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
future := rc.raft.Apply(data, 5*time.Second)
|
|
||||||
if err := future.Error(); err != nil {
|
|
||||||
return fmt.Errorf("failed to register node via raft: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
rc.nodes.Store(node.ID, nodeInfo)
|
|
||||||
|
|
||||||
rc.logger.Debug(fmt.Sprintf("Node registered via Raft: %s", node.ID))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemoveNode удаляет узел через Raft
|
|
||||||
func (rc *RaftCoordinator) RemoveNode(nodeID string) error {
|
|
||||||
if rc.singleNodeMode {
|
|
||||||
rc.nodes.Delete(nodeID)
|
|
||||||
rc.fsm.state.mu.Lock()
|
|
||||||
delete(rc.fsm.state.Nodes, nodeID)
|
|
||||||
rc.fsm.state.mu.Unlock()
|
|
||||||
rc.logger.Debug(fmt.Sprintf("Node removed locally in single-node mode: %s", nodeID))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if !rc.IsLeader() {
|
|
||||||
return fmt.Errorf("node is not the leader")
|
|
||||||
}
|
|
||||||
|
|
||||||
cmd := NodeRegistrationCommand{
|
|
||||||
Type: "remove",
|
|
||||||
NodeID: nodeID,
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := json.Marshal(cmd)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
future := rc.raft.Apply(data, 5*time.Second)
|
|
||||||
if err := future.Error(); err != nil {
|
|
||||||
return fmt.Errorf("failed to remove node via raft: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
rc.nodes.Delete(nodeID)
|
|
||||||
rc.logger.Debug(fmt.Sprintf("Node removed via Raft: %s", nodeID))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetActiveNodes возвращает активные узлы
|
|
||||||
func (rc *RaftCoordinator) GetActiveNodes() []*NodeInfo {
|
|
||||||
nodes := make([]*NodeInfo, 0)
|
|
||||||
now := time.Now().Unix()
|
|
||||||
|
|
||||||
state := rc.fsm.state
|
|
||||||
state.mu.RLock()
|
|
||||||
defer state.mu.RUnlock()
|
|
||||||
|
|
||||||
for _, nodeInfo := range state.Nodes {
|
|
||||||
if now-nodeInfo.LastSeen < 30 {
|
|
||||||
nodes = append(nodes, nodeInfo)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// В одноузловом режиме, если список пуст, возвращаем локальный узел
|
|
||||||
if rc.singleNodeMode && len(nodes) == 0 && rc.localNodeInfo != nil {
|
|
||||||
nodes = append(nodes, rc.localNodeInfo)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nodes
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetAllNodes возвращает все узлы
|
|
||||||
func (rc *RaftCoordinator) GetAllNodes() []*NodeInfo {
|
|
||||||
state := rc.fsm.state
|
|
||||||
state.mu.RLock()
|
|
||||||
defer state.mu.RUnlock()
|
|
||||||
|
|
||||||
nodes := make([]*NodeInfo, 0, len(state.Nodes))
|
|
||||||
for _, node := range state.Nodes {
|
|
||||||
nodes = append(nodes, node)
|
|
||||||
}
|
|
||||||
|
|
||||||
// В одноузловом режиме, если список пуст, возвращаем локальный узел
|
|
||||||
if rc.singleNodeMode && len(nodes) == 0 && rc.localNodeInfo != nil {
|
|
||||||
nodes = append(nodes, rc.localNodeInfo)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nodes
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetLeader возвращает лидера
|
|
||||||
func (rc *RaftCoordinator) GetLeader() *NodeInfo {
|
|
||||||
if rc.singleNodeMode {
|
|
||||||
return rc.localNodeInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
leaderAddr := rc.raft.Leader()
|
|
||||||
if leaderAddr == "" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
state := rc.fsm.state
|
|
||||||
state.mu.RLock()
|
|
||||||
defer state.mu.RUnlock()
|
|
||||||
|
|
||||||
for _, node := range state.Nodes {
|
|
||||||
nodeAddr := fmt.Sprintf("%s:%d", node.IP, node.Port)
|
|
||||||
if nodeAddr == string(leaderAddr) {
|
|
||||||
return node
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsLeader проверяет, является ли текущий узел лидером
|
|
||||||
func (rc *RaftCoordinator) IsLeader() bool {
|
|
||||||
// В одноузловом режиме всегда лидер
|
|
||||||
if rc.singleNodeMode {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return rc.isLeader.Load()
|
|
||||||
}
|
|
||||||
|
|
||||||
// SendHeartbeat обновляет heartbeat узла
|
|
||||||
func (rc *RaftCoordinator) SendHeartbeat(nodeID string) {
|
|
||||||
if val, ok := rc.nodes.Load(nodeID); ok {
|
|
||||||
nodeInfo := val.(*NodeInfo)
|
|
||||||
nodeInfo.LastSeen = time.Now().Unix()
|
|
||||||
rc.nodes.Store(nodeID, nodeInfo)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Также обновляем в FSM
|
|
||||||
rc.fsm.state.mu.Lock()
|
|
||||||
if nodeInfo, ok := rc.fsm.state.Nodes[nodeID]; ok {
|
|
||||||
nodeInfo.LastSeen = time.Now().Unix()
|
|
||||||
}
|
|
||||||
rc.fsm.state.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetClusterStatus возвращает статус кластера
|
|
||||||
func (rc *RaftCoordinator) GetClusterStatus() *ClusterStatus {
|
|
||||||
nodes := rc.GetAllNodes()
|
|
||||||
activeNodes := rc.GetActiveNodes()
|
|
||||||
|
|
||||||
syncingNodes := 0
|
|
||||||
for _, node := range nodes {
|
|
||||||
if node.Status == "syncing" {
|
|
||||||
syncingNodes++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
leader := rc.GetLeader()
|
|
||||||
leaderID := ""
|
|
||||||
if leader != nil {
|
|
||||||
leaderID = leader.ID
|
|
||||||
}
|
|
||||||
|
|
||||||
return &ClusterStatus{
|
|
||||||
Name: rc.clusterName,
|
|
||||||
TotalNodes: len(nodes),
|
|
||||||
ActiveNodes: len(activeNodes),
|
|
||||||
SyncingNodes: syncingNodes,
|
|
||||||
FailedNodes: len(nodes) - len(activeNodes),
|
|
||||||
ReplicationFactor: int(rc.replicationFactor.Load()),
|
|
||||||
LeaderID: leaderID,
|
|
||||||
Health: rc.calculateHealth(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// calculateHealth вычисляет здоровье кластера
|
|
||||||
func (rc *RaftCoordinator) calculateHealth() string {
|
|
||||||
activeNodes := rc.GetActiveNodes()
|
|
||||||
totalNodes := rc.GetAllNodes()
|
|
||||||
|
|
||||||
if len(totalNodes) == 0 {
|
|
||||||
return "critical"
|
|
||||||
}
|
|
||||||
|
|
||||||
ratio := float64(len(activeNodes)) / float64(len(totalNodes))
|
|
||||||
if ratio >= 0.8 {
|
|
||||||
return "healthy"
|
|
||||||
} else if ratio >= 0.5 {
|
|
||||||
return "degraded"
|
|
||||||
}
|
|
||||||
return "critical"
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetReplicationFactor возвращает фактор репликации
|
|
||||||
func (rc *RaftCoordinator) GetReplicationFactor() int {
|
|
||||||
return int(rc.replicationFactor.Load())
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetReplicationFactor устанавливает фактор репликации через Raft
|
|
||||||
func (rc *RaftCoordinator) SetReplicationFactor(factor int) error {
|
|
||||||
if !rc.IsLeader() {
|
|
||||||
return fmt.Errorf("node is not the leader")
|
|
||||||
}
|
|
||||||
|
|
||||||
cmd := NodeRegistrationCommand{
|
|
||||||
Type: "set_replication_factor",
|
|
||||||
Factor: int32(factor),
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := json.Marshal(cmd)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
future := rc.raft.Apply(data, 5*time.Second)
|
|
||||||
if err := future.Error(); err != nil {
|
|
||||||
return fmt.Errorf("failed to set replication factor via raft: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
rc.replicationFactor.Store(int32(factor))
|
|
||||||
rc.logger.Debug(fmt.Sprintf("Replication factor set to %d via Raft", factor))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetClusterHealth возвращает детальную информацию о здоровье кластера
|
|
||||||
func (rc *RaftCoordinator) GetClusterHealth() *ClusterHealth {
|
|
||||||
health := &ClusterHealth{
|
|
||||||
Nodes: make(map[string]*NodeHealth),
|
|
||||||
OverallScore: 100.0,
|
|
||||||
Recommendations: "",
|
|
||||||
}
|
|
||||||
|
|
||||||
now := time.Now().Unix()
|
|
||||||
state := rc.fsm.state
|
|
||||||
state.mu.RLock()
|
|
||||||
defer state.mu.RUnlock()
|
|
||||||
|
|
||||||
for nodeID, nodeInfo := range state.Nodes {
|
|
||||||
nodeHealth := &NodeHealth{
|
|
||||||
Status: nodeInfo.Status,
|
|
||||||
LatencyMs: 0,
|
|
||||||
LastCheck: now,
|
|
||||||
}
|
|
||||||
|
|
||||||
if now-nodeInfo.LastSeen > 30 {
|
|
||||||
nodeHealth.Status = "offline"
|
|
||||||
health.OverallScore -= 10
|
|
||||||
} else if nodeInfo.Status == "syncing" {
|
|
||||||
health.OverallScore -= 5
|
|
||||||
}
|
|
||||||
|
|
||||||
health.Nodes[nodeID] = nodeHealth
|
|
||||||
}
|
|
||||||
|
|
||||||
if health.OverallScore < 50 {
|
|
||||||
health.Recommendations = "Critical: Check network connectivity and node health immediately"
|
|
||||||
} else if health.OverallScore < 80 {
|
|
||||||
health.Recommendations = "Warning: Some nodes are offline or syncing, consider adding more nodes"
|
|
||||||
} else {
|
|
||||||
health.Recommendations = "Cluster is healthy, all systems operational"
|
|
||||||
}
|
|
||||||
|
|
||||||
return health
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsReplicationEnabled возвращает статус репликации
|
|
||||||
func (rc *RaftCoordinator) IsReplicationEnabled() bool {
|
|
||||||
return rc.replicationEnabled
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsMasterMasterEnabled возвращает статус мастер-мастер репликации
|
|
||||||
func (rc *RaftCoordinator) IsMasterMasterEnabled() bool {
|
|
||||||
return rc.masterMasterEnabled
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsSyncReplicationEnabled возвращает статус синхронной репликации
|
|
||||||
func (rc *RaftCoordinator) IsSyncReplicationEnabled() bool {
|
|
||||||
return rc.syncReplication
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop останавливает координатор
|
|
||||||
func (rc *RaftCoordinator) Stop() {
|
|
||||||
close(rc.stopChan)
|
|
||||||
if rc.raft != nil {
|
|
||||||
rc.raft.Shutdown()
|
|
||||||
}
|
|
||||||
rc.logger.Debug("Raft coordinator stopped")
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user