Files
futriix/internal/cluster/raft_coordinator.go

1037 lines
32 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/*
* Copyright 2026 Safronov Grigorii
*
* Licensed under the CDDL, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
* https://opensource.org/licenses/CDDL-1.0
*/
// Файл: internal/cluster/raft_coordinator.go
// Назначение: Реализация координатора распределённого кластера на основе Raft консенсус-алгоритма.
// Обеспечивает управление узлами кластера, выборы лидера, репликацию данных и отказоустойчивость.
// Поддерживает как одноузловой режим работы, так и многокластерную конфигурацию с синхронной/асинхронной репликацией.
// Добавлена полноценная поддержка временных меток для всех операций.
package cluster
import (
"encoding/json"
"fmt"
"io"
"net"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/raft"
"futriis/internal/log"
"futriis/internal/config"
)
// InmemStore реализует встроенное файловое хранилище для Raft
// Реализует интерфейсы raft.LogStore и raft.StableStore
type InmemStore struct {
mu sync.RWMutex
data map[string][]byte
path string
createdAt int64
updatedAt int64
}
// NewInmemStore создаёт новое хранилище
func NewInmemStore(path string) *InmemStore {
now := time.Now().UnixMilli()
store := &InmemStore{
data: make(map[string][]byte),
path: path,
createdAt: now,
updatedAt: now,
}
// Пытаемся загрузить существующие данные
store.load()
return store
}
// load загружает данные из файла
func (s *InmemStore) load() {
if s.path == "" {
return
}
data, err := os.ReadFile(s.path)
if err != nil {
return
}
json.Unmarshal(data, &s.data)
s.updatedAt = time.Now().UnixMilli()
}
// save сохраняет данные в файл
func (s *InmemStore) save() {
if s.path == "" {
return
}
s.updatedAt = time.Now().UnixMilli()
data, _ := json.Marshal(s.data)
os.WriteFile(s.path, data, 0644)
}
// GetCreatedAt возвращает время создания хранилища
func (s *InmemStore) GetCreatedAt() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.createdAt
}
// GetUpdatedAt возвращает время последнего обновления
func (s *InmemStore) GetUpdatedAt() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.updatedAt
}
// ==================== Реализация raft.LogStore ====================
// FirstIndex возвращает первый индекс в логе
func (s *InmemStore) FirstIndex() (uint64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var first uint64 = 0
for key := range s.data {
var idx uint64
if _, err := fmt.Sscanf(key, "log-%d", &idx); err == nil {
if first == 0 || idx < first {
first = idx
}
}
}
return first, nil
}
// LastIndex возвращает последний индекс в логе
func (s *InmemStore) LastIndex() (uint64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var last uint64 = 0
for key := range s.data {
var idx uint64
if _, err := fmt.Sscanf(key, "log-%d", &idx); err == nil {
if idx > last {
last = idx
}
}
}
return last, nil
}
// GetLog получает лог по индексу
func (s *InmemStore) GetLog(idx uint64, log *raft.Log) error {
s.mu.RLock()
defer s.mu.RUnlock()
key := fmt.Sprintf("log-%d", idx)
data, ok := s.data[key]
if !ok {
return raft.ErrLogNotFound
}
return json.Unmarshal(data, log)
}
// StoreLog сохраняет один лог
func (s *InmemStore) StoreLog(log *raft.Log) error {
return s.StoreLogs([]*raft.Log{log})
}
// StoreLogs сохраняет несколько логов
func (s *InmemStore) StoreLogs(logs []*raft.Log) error {
s.mu.Lock()
defer s.mu.Unlock()
for _, log := range logs {
key := fmt.Sprintf("log-%d", log.Index)
data, err := json.Marshal(log)
if err != nil {
return err
}
s.data[key] = data
}
s.save()
return nil
}
// DeleteRange удаляет диапазон логов
func (s *InmemStore) DeleteRange(min, max uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
for idx := min; idx <= max; idx++ {
key := fmt.Sprintf("log-%d", idx)
delete(s.data, key)
}
s.save()
return nil
}
// ==================== Реализация raft.StableStore ====================
// Get реализует raft.StableStore
func (s *InmemStore) Get(key []byte) ([]byte, error) {
s.mu.RLock()
defer s.mu.RUnlock()
val, ok := s.data[string(key)]
if !ok {
return nil, nil
}
return val, nil
}
// Set реализует raft.StableStore
func (s *InmemStore) Set(key []byte, val []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
s.data[string(key)] = val
s.save()
return nil
}
// SetUint64 реализует raft.StableStore
func (s *InmemStore) SetUint64(key []byte, val uint64) error {
return s.Set(key, []byte(fmt.Sprintf("%d", val)))
}
// GetUint64 реализует raft.StableStore
func (s *InmemStore) GetUint64(key []byte) (uint64, error) {
val, err := s.Get(key)
if err != nil {
return 0, err
}
if val == nil {
return 0, nil
}
var result uint64
fmt.Sscanf(string(val), "%d", &result)
return result, nil
}
// RaftClusterState представляет состояние кластера для Raft FSM
type RaftClusterState struct {
Nodes map[string]*NodeInfo `json:"nodes"`
ReplicationFactor int32 `json:"replication_factor"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
mu sync.RWMutex
}
// RaftCoordinator реализует координацию кластера через Raft
type RaftCoordinator struct {
raft *raft.Raft
fsm *RaftFSM
address string
raftAddr string
clusterName string
logger *log.Logger
config *config.Config
stopChan chan struct{}
nodes sync.Map
replicationFactor atomic.Int32
replicationEnabled bool
masterMasterEnabled bool
syncReplication bool
isLeader atomic.Bool
leaderMonitor chan bool
singleNodeMode bool
localNodeInfo *NodeInfo
logStore *InmemStore
stableStore *InmemStore
createdAt int64
leaderSince atomic.Int64
lastElection atomic.Int64
electionCount atomic.Uint64
}
// RaftFSM реализует конечный автомат для Raft
type RaftFSM struct {
state *RaftClusterState
logger *log.Logger
createdAt int64
}
// NodeRegistrationCommand команда регистрации узла
type NodeRegistrationCommand struct {
Type string `json:"type"`
Node NodeInfo `json:"node,omitempty"`
NodeID string `json:"node_id,omitempty"`
Factor int32 `json:"factor,omitempty"`
Timestamp int64 `json:"timestamp"`
}
// getLocalIP получает локальный IP адрес
func getLocalIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return "127.0.0.1"
}
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && ipnet.IP.To4() != nil {
return ipnet.IP.String()
}
}
return "127.0.0.1"
}
// NewRaftCoordinator создаёт новый Raft координатор
func NewRaftCoordinator(cfg *config.Config, logger *log.Logger) (*RaftCoordinator, error) {
now := time.Now().UnixMilli()
nodeIP := cfg.Cluster.NodeIP
if nodeIP == "" || nodeIP == "0.0.0.0" {
nodeIP = getLocalIP()
}
raftAddr := fmt.Sprintf("%s:%d", nodeIP, cfg.Cluster.RaftPort)
logger.Debug(fmt.Sprintf("Creating Raft coordinator at %s", raftAddr))
singleNodeMode := len(cfg.Cluster.Nodes) <= 1 || cfg.Cluster.Bootstrap
rc := &RaftCoordinator{
address: fmt.Sprintf("%s:%d", nodeIP, cfg.Cluster.NodePort),
raftAddr: raftAddr,
clusterName: cfg.Cluster.Name,
logger: logger,
config: cfg,
stopChan: make(chan struct{}),
leaderMonitor: make(chan bool, 1),
replicationEnabled: cfg.Replication.Enabled,
masterMasterEnabled: cfg.Replication.MasterMaster,
syncReplication: cfg.Replication.SyncReplication,
singleNodeMode: singleNodeMode,
createdAt: now,
localNodeInfo: &NodeInfo{
ID: fmt.Sprintf("%s-%s", cfg.Cluster.Name, nodeIP),
IP: nodeIP,
Port: cfg.Cluster.NodePort,
Status: "active",
LastSeen: now,
JoinedAt: now,
UpdatedAt: now,
Version: 1,
},
}
// Устанавливаем фактор репликации по умолчанию
defaultReplicationFactor := int32(3)
rc.replicationFactor.Store(defaultReplicationFactor)
rc.fsm = &RaftFSM{
state: &RaftClusterState{
Nodes: make(map[string]*NodeInfo),
ReplicationFactor: rc.replicationFactor.Load(),
CreatedAt: now,
UpdatedAt: now,
},
logger: logger,
createdAt: now,
}
if singleNodeMode {
rc.fsm.state.mu.Lock()
rc.fsm.state.Nodes[rc.localNodeInfo.ID] = rc.localNodeInfo
rc.fsm.state.mu.Unlock()
rc.isLeader.Store(true)
rc.leaderSince.Store(now)
logger.Debug(fmt.Sprintf("Single-node mode: local node added to state: %s (joined at %s)",
rc.localNodeInfo.ID, rc.localNodeInfo.NodeJoinedAt()))
}
raftConfig := raft.DefaultConfig()
raftConfig.LocalID = raft.ServerID(rc.localNodeInfo.ID)
raftConfig.HeartbeatTimeout = 1000 * time.Millisecond
raftConfig.ElectionTimeout = 1000 * time.Millisecond
raftConfig.CommitTimeout = 500 * time.Millisecond
raftConfig.LeaderLeaseTimeout = 500 * time.Millisecond
if singleNodeMode {
raftConfig.HeartbeatTimeout = 500 * time.Millisecond
raftConfig.ElectionTimeout = 500 * time.Millisecond
raftConfig.LeaderLeaseTimeout = 500 * time.Millisecond
raftConfig.LogOutput = io.Discard
logger.Debug("Running in single-node mode (warnings suppressed)")
} else {
raftConfig.LogOutput = os.Stderr
}
dataDir := cfg.Cluster.RaftDataDir
if err := os.MkdirAll(dataDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create raft data dir: %v", err)
}
logger.Debug(fmt.Sprintf("Raft data directory: %s", dataDir))
// Используем встроенные хранилища вместо boltDB
rc.logStore = NewInmemStore(filepath.Join(dataDir, "raft-log.json"))
rc.stableStore = NewInmemStore(filepath.Join(dataDir, "raft-stable.json"))
// Создаём снапшот хранилище
snapshotStore, err := raft.NewFileSnapshotStore(dataDir, 2, os.Stderr)
if err != nil {
return nil, fmt.Errorf("failed to create snapshot store: %v", err)
}
// Создаём транспорт
transport, err := raft.NewTCPTransport(raftAddr, nil, 3, 10*time.Second, os.Stderr)
if err != nil {
return nil, fmt.Errorf("failed to create transport: %v", err)
}
// Создаём Raft инстанс - используем оба хранилища
r, err := raft.NewRaft(raftConfig, rc.fsm, rc.logStore, rc.stableStore, snapshotStore, transport)
if err != nil {
return nil, fmt.Errorf("failed to create raft: %v", err)
}
rc.raft = r
time.Sleep(500 * time.Millisecond)
bootstrapPath := filepath.Join(dataDir, "raft-log.json")
_, statErr := os.Stat(bootstrapPath)
needsBootstrap := os.IsNotExist(statErr)
if needsBootstrap && singleNodeMode {
logger.Debug("Bootstrapping single-node cluster...")
configuration := raft.Configuration{
Servers: []raft.Server{
{
ID: raftConfig.LocalID,
Address: transport.LocalAddr(),
},
},
}
future := r.BootstrapCluster(configuration)
if err := future.Error(); err != nil {
logger.Warn(fmt.Sprintf("Bootstrap error: %v", err))
} else {
logger.Debug(fmt.Sprintf("Single-node cluster bootstrapped successfully at %s",
time.Now().Format("2006-01-02 15:04:05.000")))
}
time.Sleep(1 * time.Second)
} else if needsBootstrap && len(cfg.Cluster.Nodes) > 1 {
logger.Debug(fmt.Sprintf("Bootstrapping multi-node cluster at %s...",
time.Now().Format("2006-01-02 15:04:05.000")))
servers := make([]raft.Server, 0, len(cfg.Cluster.Nodes))
for i, nodeAddr := range cfg.Cluster.Nodes {
serverID := raft.ServerID(fmt.Sprintf("%s-node%d", rc.clusterName, i+1))
servers = append(servers, raft.Server{
ID: serverID,
Address: raft.ServerAddress(nodeAddr),
})
}
configuration := raft.Configuration{
Servers: servers,
}
future := r.BootstrapCluster(configuration)
if err := future.Error(); err != nil {
logger.Warn(fmt.Sprintf("Bootstrap error: %v", err))
} else {
logger.Debug("Multi-node cluster bootstrapped successfully")
}
go rc.monitorLeadership()
logger.Debug("Waiting for leader election...")
timeout := time.After(5 * time.Second)
leaderElected := false
for !leaderElected {
select {
case isLeader := <-rc.leaderMonitor:
if isLeader {
leaderElected = true
rc.isLeader.Store(true)
rc.leaderSince.Store(time.Now().UnixMilli())
rc.lastElection.Store(time.Now().UnixMilli())
logger.Debug(fmt.Sprintf("This node is now the cluster leader (elected at %s)",
time.Now().Format("2006-01-02 15:04:05.000")))
}
case <-timeout:
logger.Warn("Leader election timeout")
leaderElected = true
}
}
} else {
logger.Debug("Existing Raft state found, joining cluster...")
go rc.monitorLeadership()
if !singleNodeMode {
time.Sleep(1 * time.Second)
if r.State() == raft.Leader {
rc.isLeader.Store(true)
rc.leaderSince.Store(time.Now().UnixMilli())
logger.Debug(fmt.Sprintf("This node is the cluster leader (since %s)",
time.UnixMilli(rc.leaderSince.Load()).Format("2006-01-02 15:04:05.000")))
}
}
}
logger.Debug(fmt.Sprintf("Raft coordinator started at %s, IsLeader: %v, SingleNodeMode: %v, Created: %s",
raftAddr, rc.isLeader.Load(), singleNodeMode, time.UnixMilli(rc.createdAt).Format("2006-01-02 15:04:05.000")))
return rc, nil
}
// monitorLeadership отслеживает изменения лидера
func (rc *RaftCoordinator) monitorLeadership() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
wasLeader := false
for {
select {
case <-rc.stopChan:
return
case <-ticker.C:
if rc.raft == nil {
continue
}
isLeader := rc.raft.State() == raft.Leader
if isLeader != wasLeader {
wasLeader = isLeader
select {
case rc.leaderMonitor <- isLeader:
default:
}
if isLeader {
rc.isLeader.Store(true)
rc.leaderSince.Store(time.Now().UnixMilli())
rc.electionCount.Add(1)
rc.logger.Debug(fmt.Sprintf("Leadership acquired at %s (election #%d)",
time.Now().Format("2006-01-02 15:04:05.000"), rc.electionCount.Load()))
} else {
rc.isLeader.Store(false)
rc.lastElection.Store(time.Now().UnixMilli())
rc.logger.Debug(fmt.Sprintf("Leadership lost at %s",
time.Now().Format("2006-01-02 15:04:05.000")))
}
}
}
}
}
// Apply применяет команду к FSM
func (f *RaftFSM) Apply(log *raft.Log) interface{} {
var cmd NodeRegistrationCommand
if err := json.Unmarshal(log.Data, &cmd); err != nil {
f.logger.Error(fmt.Sprintf("Failed to unmarshal raft command: %v", err))
return err
}
f.state.mu.Lock()
defer f.state.mu.Unlock()
now := time.Now().UnixMilli()
f.state.UpdatedAt = now
switch cmd.Type {
case "register":
cmd.Node.JoinedAt = cmd.Timestamp
cmd.Node.UpdatedAt = now
cmd.Node.LastSeen = cmd.Timestamp
f.state.Nodes[cmd.Node.ID] = &cmd.Node
f.logger.Debug(fmt.Sprintf("Raft: Node registered: %s (joined at %s)",
cmd.Node.ID, time.UnixMilli(cmd.Node.JoinedAt).Format("2006-01-02 15:04:05.000")))
case "remove":
nodeID := cmd.NodeID
delete(f.state.Nodes, nodeID)
f.logger.Debug(fmt.Sprintf("Raft: Node removed: %s at %s", nodeID,
time.UnixMilli(now).Format("2006-01-02 15:04:05.000")))
case "set_replication_factor":
oldFactor := f.state.ReplicationFactor
f.state.ReplicationFactor = cmd.Factor
f.logger.Debug(fmt.Sprintf("Raft: Replication factor changed from %d to %d at %s",
oldFactor, cmd.Factor, time.UnixMilli(now).Format("2006-01-02 15:04:05.000")))
}
return nil
}
// Snapshot реализует создание снапшота
func (f *RaftFSM) Snapshot() (raft.FSMSnapshot, error) {
f.state.mu.RLock()
defer f.state.mu.RUnlock()
stateCopy := &RaftClusterState{
Nodes: make(map[string]*NodeInfo),
ReplicationFactor: f.state.ReplicationFactor,
CreatedAt: f.state.CreatedAt,
UpdatedAt: time.Now().UnixMilli(),
}
for k, v := range f.state.Nodes {
stateCopy.Nodes[k] = v
}
return &RaftSnapshot{state: stateCopy}, nil
}
// Restore восстанавливает состояние из снапшота
func (f *RaftFSM) Restore(snapshot io.ReadCloser) error {
defer snapshot.Close()
var state RaftClusterState
decoder := json.NewDecoder(snapshot)
if err := decoder.Decode(&state); err != nil {
return err
}
f.state.mu.Lock()
defer f.state.mu.Unlock()
f.state.Nodes = state.Nodes
f.state.ReplicationFactor = state.ReplicationFactor
f.state.CreatedAt = state.CreatedAt
f.state.UpdatedAt = time.Now().UnixMilli()
f.logger.Debug(fmt.Sprintf("Raft state restored from snapshot at %s",
time.UnixMilli(f.state.UpdatedAt).Format("2006-01-02 15:04:05.000")))
return nil
}
// RaftSnapshot реализует интерфейс FSMSnapshot
type RaftSnapshot struct {
state *RaftClusterState
}
// Persist сохраняет снапшот
func (s *RaftSnapshot) Persist(sink raft.SnapshotSink) error {
err := func() error {
data, err := json.Marshal(s.state)
if err != nil {
return err
}
if _, err := sink.Write(data); err != nil {
return err
}
return sink.Close()
}()
if err != nil {
sink.Cancel()
return err
}
return nil
}
// Release освобождает ресурсы
func (s *RaftSnapshot) Release() {}
// RegisterNode регистрирует узел через Raft
func (rc *RaftCoordinator) RegisterNode(node *Node) error {
now := time.Now().UnixMilli()
nodeInfo := &NodeInfo{
ID: node.ID,
IP: node.IP,
Port: node.Port,
Status: "active",
LastSeen: now,
JoinedAt: now,
UpdatedAt: now,
Version: 1,
}
if rc.singleNodeMode {
rc.logger.Debug("Single-node mode: registering node without Raft consensus")
rc.nodes.Store(node.ID, nodeInfo)
rc.fsm.state.mu.Lock()
rc.fsm.state.Nodes[node.ID] = nodeInfo
rc.fsm.state.UpdatedAt = now
rc.fsm.state.mu.Unlock()
rc.logger.Debug(fmt.Sprintf("Node registered locally in single-node mode: %s (joined at %s)",
node.ID, time.UnixMilli(now).Format("2006-01-02 15:04:05.000")))
return nil
}
if !rc.IsLeader() {
leader := rc.GetLeader()
if leader != nil {
rc.logger.Warn(fmt.Sprintf("Current node is not leader. Leader is %s:%d (since %s)",
leader.IP, leader.Port, time.UnixMilli(leader.JoinedAt).Format("15:04:05")))
return fmt.Errorf("node is not the leader. Please connect to leader at %s:%d", leader.IP, leader.Port)
}
return fmt.Errorf("node is not the leader and no leader found")
}
cmd := NodeRegistrationCommand{
Type: "register",
Node: *nodeInfo,
Timestamp: now,
}
data, err := json.Marshal(cmd)
if err != nil {
return err
}
future := rc.raft.Apply(data, 5*time.Second)
if err := future.Error(); err != nil {
return fmt.Errorf("failed to register node via raft: %v", err)
}
rc.nodes.Store(node.ID, nodeInfo)
rc.logger.Debug(fmt.Sprintf("Node registered via Raft: %s (joined at %s)",
node.ID, time.UnixMilli(now).Format("2006-01-02 15:04:05.000")))
return nil
}
// RemoveNode удаляет узел через Raft
func (rc *RaftCoordinator) RemoveNode(nodeID string) error {
now := time.Now().UnixMilli()
if rc.singleNodeMode {
rc.nodes.Delete(nodeID)
rc.fsm.state.mu.Lock()
delete(rc.fsm.state.Nodes, nodeID)
rc.fsm.state.UpdatedAt = now
rc.fsm.state.mu.Unlock()
rc.logger.Debug(fmt.Sprintf("Node removed locally in single-node mode: %s at %s",
nodeID, time.UnixMilli(now).Format("2006-01-02 15:04:05.000")))
return nil
}
if !rc.IsLeader() {
return fmt.Errorf("node is not the leader")
}
cmd := NodeRegistrationCommand{
Type: "remove",
NodeID: nodeID,
Timestamp: now,
}
data, err := json.Marshal(cmd)
if err != nil {
return err
}
future := rc.raft.Apply(data, 5*time.Second)
if err := future.Error(); err != nil {
return fmt.Errorf("failed to remove node via raft: %v", err)
}
rc.nodes.Delete(nodeID)
rc.logger.Debug(fmt.Sprintf("Node removed via Raft: %s at %s", nodeID,
time.UnixMilli(now).Format("2006-01-02 15:04:05.000")))
return nil
}
// GetActiveNodes возвращает активные узлы
func (rc *RaftCoordinator) GetActiveNodes() []*NodeInfo {
nodes := make([]*NodeInfo, 0)
now := time.Now().UnixMilli()
state := rc.fsm.state
state.mu.RLock()
defer state.mu.RUnlock()
for _, nodeInfo := range state.Nodes {
if now-nodeInfo.LastSeen < 30000 { // 30 секунд
nodes = append(nodes, nodeInfo)
}
}
if rc.singleNodeMode && len(nodes) == 0 && rc.localNodeInfo != nil {
nodes = append(nodes, rc.localNodeInfo)
}
return nodes
}
// GetAllNodes возвращает все узлы
func (rc *RaftCoordinator) GetAllNodes() []*NodeInfo {
state := rc.fsm.state
state.mu.RLock()
defer state.mu.RUnlock()
nodes := make([]*NodeInfo, 0, len(state.Nodes))
for _, node := range state.Nodes {
nodes = append(nodes, node)
}
if rc.singleNodeMode && len(nodes) == 0 && rc.localNodeInfo != nil {
nodes = append(nodes, rc.localNodeInfo)
}
return nodes
}
// GetLeader возвращает лидера
func (rc *RaftCoordinator) GetLeader() *NodeInfo {
if rc.singleNodeMode {
return rc.localNodeInfo
}
leaderAddr := rc.raft.Leader()
if leaderAddr == "" {
return nil
}
state := rc.fsm.state
state.mu.RLock()
defer state.mu.RUnlock()
for _, node := range state.Nodes {
nodeAddr := fmt.Sprintf("%s:%d", node.IP, node.Port)
if nodeAddr == string(leaderAddr) {
return node
}
}
return nil
}
// IsLeader проверяет, является ли текущий узел лидером
func (rc *RaftCoordinator) IsLeader() bool {
if rc.singleNodeMode {
return true
}
return rc.isLeader.Load()
}
// GetLeaderSince возвращает время начала лидерства
func (rc *RaftCoordinator) GetLeaderSince() int64 {
return rc.leaderSince.Load()
}
// GetLeaderSinceStr возвращает человекочитаемое время начала лидерства
func (rc *RaftCoordinator) GetLeaderSinceStr() string {
since := rc.leaderSince.Load()
if since == 0 {
return "never"
}
return time.UnixMilli(since).Format("2006-01-02 15:04:05.000")
}
// GetElectionCount возвращает количество выборов
func (rc *RaftCoordinator) GetElectionCount() uint64 {
return rc.electionCount.Load()
}
// SendHeartbeat обновляет heartbeat узла
func (rc *RaftCoordinator) SendHeartbeat(nodeID string) {
now := time.Now().UnixMilli()
if val, ok := rc.nodes.Load(nodeID); ok {
nodeInfo := val.(*NodeInfo)
nodeInfo.LastSeen = now
nodeInfo.UpdatedAt = now
rc.nodes.Store(nodeID, nodeInfo)
}
rc.fsm.state.mu.Lock()
if nodeInfo, ok := rc.fsm.state.Nodes[nodeID]; ok {
nodeInfo.LastSeen = now
nodeInfo.UpdatedAt = now
}
rc.fsm.state.mu.Unlock()
}
// GetClusterStatus возвращает статус кластера
func (rc *RaftCoordinator) GetClusterStatus() *ClusterStatus {
nodes := rc.GetAllNodes()
activeNodes := rc.GetActiveNodes()
syncingNodes := 0
for _, node := range nodes {
if node.Status == "syncing" {
syncingNodes++
}
}
leader := rc.GetLeader()
leaderID := ""
if leader != nil {
leaderID = leader.ID
}
now := time.Now().UnixMilli()
return &ClusterStatus{
Name: rc.clusterName,
TotalNodes: len(nodes),
ActiveNodes: len(activeNodes),
SyncingNodes: syncingNodes,
FailedNodes: len(nodes) - len(activeNodes),
ReplicationFactor: int(rc.replicationFactor.Load()),
LeaderID: leaderID,
Health: rc.calculateHealth(),
CreatedAt: rc.createdAt,
UpdatedAt: now,
}
}
// calculateHealth вычисляет здоровье кластера
func (rc *RaftCoordinator) calculateHealth() string {
activeNodes := rc.GetActiveNodes()
totalNodes := rc.GetAllNodes()
if len(totalNodes) == 0 {
return "critical"
}
ratio := float64(len(activeNodes)) / float64(len(totalNodes))
if ratio >= 0.8 {
return "healthy"
} else if ratio >= 0.5 {
return "degraded"
}
return "critical"
}
// GetReplicationFactor возвращает фактор репликации
func (rc *RaftCoordinator) GetReplicationFactor() int {
return int(rc.replicationFactor.Load())
}
// SetReplicationFactor устанавливает фактор репликации через Raft
func (rc *RaftCoordinator) SetReplicationFactor(factor int) error {
now := time.Now().UnixMilli()
if !rc.IsLeader() {
return fmt.Errorf("node is not the leader")
}
cmd := NodeRegistrationCommand{
Type: "set_replication_factor",
Factor: int32(factor),
Timestamp: now,
}
data, err := json.Marshal(cmd)
if err != nil {
return err
}
future := rc.raft.Apply(data, 5*time.Second)
if err := future.Error(); err != nil {
return fmt.Errorf("failed to set replication factor via raft: %v", err)
}
rc.replicationFactor.Store(int32(factor))
rc.logger.Debug(fmt.Sprintf("Replication factor set to %d via Raft at %s", factor,
time.UnixMilli(now).Format("2006-01-02 15:04:05.000")))
return nil
}
// GetClusterHealth возвращает детальную информацию о здоровье кластера
func (rc *RaftCoordinator) GetClusterHealth() *ClusterHealth {
now := time.Now().UnixMilli()
health := &ClusterHealth{
Nodes: make(map[string]*NodeHealth),
OverallScore: 100.0,
CheckedAt: now,
Recommendations: "",
}
state := rc.fsm.state
state.mu.RLock()
defer state.mu.RUnlock()
for nodeID, nodeInfo := range state.Nodes {
nodeHealth := &NodeHealth{
Status: nodeInfo.Status,
LatencyMs: 0,
LastCheck: now,
LastSuccess: nodeInfo.LastSeen,
FailureCount: 0,
}
if now-nodeInfo.LastSeen > 30000 { // 30 секунд
nodeHealth.Status = "offline"
nodeHealth.LastFailure = nodeInfo.LastSeen
nodeHealth.FailureCount++
health.OverallScore -= 10
} else if nodeInfo.Status == "syncing" {
health.OverallScore -= 5
} else {
nodeHealth.LastSuccess = nodeInfo.LastSeen
}
health.Nodes[nodeID] = nodeHealth
}
if health.OverallScore < 50 {
health.Recommendations = fmt.Sprintf("Critical: Check network connectivity and node health immediately (last checked: %s)",
time.UnixMilli(now).Format("2006-01-02 15:04:05.000"))
} else if health.OverallScore < 80 {
health.Recommendations = "Warning: Some nodes are offline or syncing, consider adding more nodes"
} else {
health.Recommendations = "Cluster is healthy, all systems operational"
}
return health
}
// IsReplicationEnabled возвращает статус репликации
func (rc *RaftCoordinator) IsReplicationEnabled() bool {
return rc.replicationEnabled
}
// IsMasterMasterEnabled возвращает статус мастер-мастер репликации
func (rc *RaftCoordinator) IsMasterMasterEnabled() bool {
return rc.masterMasterEnabled
}
// IsSyncReplicationEnabled возвращает статус синхронной репликации
func (rc *RaftCoordinator) IsSyncReplicationEnabled() bool {
return rc.syncReplication
}
// GetClusterCreatedAt возвращает время создания кластера
func (rc *RaftCoordinator) GetClusterCreatedAt() int64 {
return rc.createdAt
}
// GetClusterCreatedAtStr возвращает человекочитаемое время создания кластера
func (rc *RaftCoordinator) GetClusterCreatedAtStr() string {
return time.UnixMilli(rc.createdAt).Format("2006-01-02 15:04:05.000")
}
// GetClusterUptime возвращает время жизни кластера
func (rc *RaftCoordinator) GetClusterUptime() time.Duration {
return time.Duration(time.Now().UnixMilli()-rc.createdAt) * time.Millisecond
}
// Stop останавливает координатор
func (rc *RaftCoordinator) Stop() {
now := time.Now().UnixMilli()
close(rc.stopChan)
if rc.raft != nil {
rc.raft.Shutdown()
}
rc.logger.Debug(fmt.Sprintf("Raft coordinator stopped at %s (uptime: %s)",
time.UnixMilli(now).Format("2006-01-02 15:04:05.000"), rc.GetClusterUptime()))
}