Files
futriix/internal/cluster/raft_coordinator.go

889 lines
26 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/*
* Copyright 2026 Safronov Grigorii
*
* Licensed under the CDDL, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
* https://opensource.org/licenses/CDDL-1.0
*/
// Файл: internal/cluster/raft_coordinator.go
// Назначение: Реализация координатора распределённого кластера на основе Raft консенсус-алгоритма.
// Обеспечивает управление узлами кластера, выборы лидера, репликацию данных и отказоустойчивость.
// Поддерживает как одноузловой режим работы, так и многокластерную конфигурацию с синхронной/асинхронной репликацией.
package cluster
import (
"encoding/json"
"fmt"
"io"
"net"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/raft"
"futriis/internal/log"
"futriis/internal/config"
)
// InmemStore реализует встроенное файловое хранилище для Raft
// Реализует интерфейсы raft.LogStore и raft.StableStore
type InmemStore struct {
mu sync.RWMutex
data map[string][]byte
path string
}
// NewInmemStore создаёт новое хранилище
func NewInmemStore(path string) *InmemStore {
store := &InmemStore{
data: make(map[string][]byte),
path: path,
}
// Пытаемся загрузить существующие данные
store.load()
return store
}
// load загружает данные из файла
func (s *InmemStore) load() {
if s.path == "" {
return
}
data, err := os.ReadFile(s.path)
if err != nil {
return
}
json.Unmarshal(data, &s.data)
}
// save сохраняет данные в файл
func (s *InmemStore) save() {
if s.path == "" {
return
}
data, _ := json.Marshal(s.data)
os.WriteFile(s.path, data, 0644)
}
// ==================== Реализация raft.LogStore ====================
// FirstIndex возвращает первый индекс в логе
func (s *InmemStore) FirstIndex() (uint64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var first uint64 = 0
for key := range s.data {
var idx uint64
if _, err := fmt.Sscanf(key, "log-%d", &idx); err == nil {
if first == 0 || idx < first {
first = idx
}
}
}
return first, nil
}
// LastIndex возвращает последний индекс в логе
func (s *InmemStore) LastIndex() (uint64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var last uint64 = 0
for key := range s.data {
var idx uint64
if _, err := fmt.Sscanf(key, "log-%d", &idx); err == nil {
if idx > last {
last = idx
}
}
}
return last, nil
}
// GetLog получает лог по индексу
func (s *InmemStore) GetLog(idx uint64, log *raft.Log) error {
s.mu.RLock()
defer s.mu.RUnlock()
key := fmt.Sprintf("log-%d", idx)
data, ok := s.data[key]
if !ok {
return raft.ErrLogNotFound
}
return json.Unmarshal(data, log)
}
// StoreLog сохраняет один лог
func (s *InmemStore) StoreLog(log *raft.Log) error {
return s.StoreLogs([]*raft.Log{log})
}
// StoreLogs сохраняет несколько логов
func (s *InmemStore) StoreLogs(logs []*raft.Log) error {
s.mu.Lock()
defer s.mu.Unlock()
for _, log := range logs {
key := fmt.Sprintf("log-%d", log.Index)
data, err := json.Marshal(log)
if err != nil {
return err
}
s.data[key] = data
}
s.save()
return nil
}
// DeleteRange удаляет диапазон логов
func (s *InmemStore) DeleteRange(min, max uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
for idx := min; idx <= max; idx++ {
key := fmt.Sprintf("log-%d", idx)
delete(s.data, key)
}
s.save()
return nil
}
// ==================== Реализация raft.StableStore ====================
// Get реализует raft.StableStore
func (s *InmemStore) Get(key []byte) ([]byte, error) {
s.mu.RLock()
defer s.mu.RUnlock()
val, ok := s.data[string(key)]
if !ok {
return nil, nil
}
return val, nil
}
// Set реализует raft.StableStore
func (s *InmemStore) Set(key []byte, val []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
s.data[string(key)] = val
s.save()
return nil
}
// SetUint64 реализует raft.StableStore
func (s *InmemStore) SetUint64(key []byte, val uint64) error {
return s.Set(key, []byte(fmt.Sprintf("%d", val)))
}
// GetUint64 реализует raft.StableStore
func (s *InmemStore) GetUint64(key []byte) (uint64, error) {
val, err := s.Get(key)
if err != nil {
return 0, err
}
if val == nil {
return 0, nil
}
var result uint64
fmt.Sscanf(string(val), "%d", &result)
return result, nil
}
// RaftClusterState представляет состояние кластера для Raft FSM
type RaftClusterState struct {
Nodes map[string]*NodeInfo `json:"nodes"`
ReplicationFactor int32 `json:"replication_factor"`
mu sync.RWMutex
}
// RaftCoordinator реализует координацию кластера через Raft
type RaftCoordinator struct {
raft *raft.Raft
fsm *RaftFSM
address string
raftAddr string
clusterName string
logger *log.Logger
config *config.Config
stopChan chan struct{}
nodes sync.Map
replicationFactor atomic.Int32
replicationEnabled bool
masterMasterEnabled bool
syncReplication bool
isLeader atomic.Bool
leaderMonitor chan bool
singleNodeMode bool
localNodeInfo *NodeInfo
logStore *InmemStore
stableStore *InmemStore
}
// RaftFSM реализует конечный автомат для Raft
type RaftFSM struct {
state *RaftClusterState
logger *log.Logger
}
// NodeRegistrationCommand команда регистрации узла
type NodeRegistrationCommand struct {
Type string `json:"type"`
Node NodeInfo `json:"node,omitempty"`
NodeID string `json:"node_id,omitempty"`
Factor int32 `json:"factor,omitempty"`
}
// getLocalIP получает локальный IP адрес
func getLocalIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return "127.0.0.1"
}
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && ipnet.IP.To4() != nil {
return ipnet.IP.String()
}
}
return "127.0.0.1"
}
// NewRaftCoordinator создаёт новый Raft координатор
func NewRaftCoordinator(cfg *config.Config, logger *log.Logger) (*RaftCoordinator, error) {
nodeIP := cfg.Cluster.NodeIP
if nodeIP == "" || nodeIP == "0.0.0.0" {
nodeIP = getLocalIP()
}
raftAddr := fmt.Sprintf("%s:%d", nodeIP, cfg.Cluster.RaftPort)
logger.Debug(fmt.Sprintf("Creating Raft coordinator at %s", raftAddr))
singleNodeMode := len(cfg.Cluster.Nodes) <= 1 || cfg.Cluster.Bootstrap
rc := &RaftCoordinator{
address: fmt.Sprintf("%s:%d", nodeIP, cfg.Cluster.NodePort),
raftAddr: raftAddr,
clusterName: cfg.Cluster.Name,
logger: logger,
config: cfg,
stopChan: make(chan struct{}),
leaderMonitor: make(chan bool, 1),
replicationEnabled: cfg.Replication.Enabled,
masterMasterEnabled: cfg.Replication.MasterMaster,
syncReplication: cfg.Replication.SyncReplication,
singleNodeMode: singleNodeMode,
localNodeInfo: &NodeInfo{
ID: fmt.Sprintf("%s-%s", cfg.Cluster.Name, nodeIP),
IP: nodeIP,
Port: cfg.Cluster.NodePort,
Status: "active",
LastSeen: time.Now().Unix(),
},
}
rc.replicationFactor.Store(int32(3))
rc.fsm = &RaftFSM{
state: &RaftClusterState{
Nodes: make(map[string]*NodeInfo),
},
logger: logger,
}
if singleNodeMode {
rc.fsm.state.mu.Lock()
rc.fsm.state.Nodes[rc.localNodeInfo.ID] = rc.localNodeInfo
rc.fsm.state.mu.Unlock()
rc.isLeader.Store(true)
logger.Debug(fmt.Sprintf("Single-node mode: local node added to state: %s", rc.localNodeInfo.ID))
}
raftConfig := raft.DefaultConfig()
raftConfig.LocalID = raft.ServerID(rc.localNodeInfo.ID)
raftConfig.HeartbeatTimeout = 1000 * time.Millisecond
raftConfig.ElectionTimeout = 1000 * time.Millisecond
raftConfig.CommitTimeout = 500 * time.Millisecond
raftConfig.LeaderLeaseTimeout = 500 * time.Millisecond
if singleNodeMode {
raftConfig.HeartbeatTimeout = 500 * time.Millisecond
raftConfig.ElectionTimeout = 500 * time.Millisecond
raftConfig.LeaderLeaseTimeout = 500 * time.Millisecond
raftConfig.LogOutput = io.Discard
logger.Debug("Running in single-node mode (warnings suppressed)")
} else {
raftConfig.LogOutput = os.Stderr
}
dataDir := cfg.Cluster.RaftDataDir
if err := os.MkdirAll(dataDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create raft data dir: %v", err)
}
logger.Debug(fmt.Sprintf("Raft data directory: %s", dataDir))
// Используем встроенные хранилища вместо boltDB
rc.logStore = NewInmemStore(filepath.Join(dataDir, "raft-log.json"))
rc.stableStore = NewInmemStore(filepath.Join(dataDir, "raft-stable.json"))
// Создаём снапшот хранилище
snapshotStore, err := raft.NewFileSnapshotStore(dataDir, 2, os.Stderr)
if err != nil {
return nil, fmt.Errorf("failed to create snapshot store: %v", err)
}
// Создаём транспорт
transport, err := raft.NewTCPTransport(raftAddr, nil, 3, 10*time.Second, os.Stderr)
if err != nil {
return nil, fmt.Errorf("failed to create transport: %v", err)
}
// Создаём Raft инстанс - используем оба хранилища
r, err := raft.NewRaft(raftConfig, rc.fsm, rc.logStore, rc.stableStore, snapshotStore, transport)
if err != nil {
return nil, fmt.Errorf("failed to create raft: %v", err)
}
rc.raft = r
time.Sleep(500 * time.Millisecond)
bootstrapPath := filepath.Join(dataDir, "raft-log.json")
_, statErr := os.Stat(bootstrapPath)
needsBootstrap := os.IsNotExist(statErr)
if needsBootstrap && singleNodeMode {
logger.Debug("Bootstrapping single-node cluster...")
configuration := raft.Configuration{
Servers: []raft.Server{
{
ID: raftConfig.LocalID,
Address: transport.LocalAddr(),
},
},
}
future := r.BootstrapCluster(configuration)
if err := future.Error(); err != nil {
logger.Warn(fmt.Sprintf("Bootstrap error: %v", err))
} else {
logger.Debug("Single-node cluster bootstrapped successfully")
}
time.Sleep(1 * time.Second)
} else if needsBootstrap && len(cfg.Cluster.Nodes) > 1 {
logger.Debug("Bootstrapping multi-node cluster...")
servers := make([]raft.Server, 0, len(cfg.Cluster.Nodes))
for i, nodeAddr := range cfg.Cluster.Nodes {
serverID := raft.ServerID(fmt.Sprintf("%s-node%d", rc.clusterName, i+1))
servers = append(servers, raft.Server{
ID: serverID,
Address: raft.ServerAddress(nodeAddr),
})
}
configuration := raft.Configuration{
Servers: servers,
}
future := r.BootstrapCluster(configuration)
if err := future.Error(); err != nil {
logger.Warn(fmt.Sprintf("Bootstrap error: %v", err))
} else {
logger.Debug("Multi-node cluster bootstrapped successfully")
}
go rc.monitorLeadership()
logger.Debug("Waiting for leader election...")
timeout := time.After(5 * time.Second)
leaderElected := false
for !leaderElected {
select {
case isLeader := <-rc.leaderMonitor:
if isLeader {
leaderElected = true
rc.isLeader.Store(true)
logger.Debug("This node is now the cluster leader")
}
case <-timeout:
logger.Warn("Leader election timeout")
leaderElected = true
}
}
} else {
logger.Debug("Existing Raft state found, joining cluster...")
go rc.monitorLeadership()
if !singleNodeMode {
time.Sleep(1 * time.Second)
if r.State() == raft.Leader {
rc.isLeader.Store(true)
logger.Debug("This node is the cluster leader")
}
}
}
logger.Debug(fmt.Sprintf("Raft coordinator started at %s, IsLeader: %v, SingleNodeMode: %v", raftAddr, rc.isLeader.Load(), singleNodeMode))
return rc, nil
}
// monitorLeadership отслеживает изменения лидера
func (rc *RaftCoordinator) monitorLeadership() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
wasLeader := false
for {
select {
case <-rc.stopChan:
return
case <-ticker.C:
if rc.raft == nil {
continue
}
isLeader := rc.raft.State() == raft.Leader
if isLeader != wasLeader {
wasLeader = isLeader
select {
case rc.leaderMonitor <- isLeader:
default:
}
if isLeader {
rc.isLeader.Store(true)
rc.logger.Debug("Leadership acquired")
} else {
rc.isLeader.Store(false)
rc.logger.Debug("Leadership lost")
}
}
}
}
}
// Apply применяет команду к FSM
func (f *RaftFSM) Apply(log *raft.Log) interface{} {
var cmd NodeRegistrationCommand
if err := json.Unmarshal(log.Data, &cmd); err != nil {
f.logger.Error(fmt.Sprintf("Failed to unmarshal raft command: %v", err))
return err
}
f.state.mu.Lock()
defer f.state.mu.Unlock()
switch cmd.Type {
case "register":
f.state.Nodes[cmd.Node.ID] = &cmd.Node
f.logger.Debug(fmt.Sprintf("Raft: Node registered: %s", cmd.Node.ID))
case "remove":
delete(f.state.Nodes, cmd.NodeID)
f.logger.Debug(fmt.Sprintf("Raft: Node removed: %s", cmd.NodeID))
case "set_replication_factor":
f.state.ReplicationFactor = cmd.Factor
f.logger.Debug(fmt.Sprintf("Raft: Replication factor set to %d", cmd.Factor))
}
return nil
}
// Snapshot реализует создание снапшота
func (f *RaftFSM) Snapshot() (raft.FSMSnapshot, error) {
f.state.mu.RLock()
defer f.state.mu.RUnlock()
stateCopy := &RaftClusterState{
Nodes: make(map[string]*NodeInfo),
ReplicationFactor: f.state.ReplicationFactor,
}
for k, v := range f.state.Nodes {
stateCopy.Nodes[k] = v
}
return &RaftSnapshot{state: stateCopy}, nil
}
// Restore восстанавливает состояние из снапшота
func (f *RaftFSM) Restore(snapshot io.ReadCloser) error {
defer snapshot.Close()
var state RaftClusterState
decoder := json.NewDecoder(snapshot)
if err := decoder.Decode(&state); err != nil {
return err
}
f.state.mu.Lock()
defer f.state.mu.Unlock()
f.state.Nodes = state.Nodes
f.state.ReplicationFactor = state.ReplicationFactor
return nil
}
// RaftSnapshot реализует интерфейс FSMSnapshot
type RaftSnapshot struct {
state *RaftClusterState
}
// Persist сохраняет снапшот
func (s *RaftSnapshot) Persist(sink raft.SnapshotSink) error {
err := func() error {
data, err := json.Marshal(s.state)
if err != nil {
return err
}
if _, err := sink.Write(data); err != nil {
return err
}
return sink.Close()
}()
if err != nil {
sink.Cancel()
return err
}
return nil
}
// Release освобождает ресурсы
func (s *RaftSnapshot) Release() {}
// RegisterNode регистрирует узел через Raft
func (rc *RaftCoordinator) RegisterNode(node *Node) error {
nodeInfo := &NodeInfo{
ID: node.ID,
IP: node.IP,
Port: node.Port,
Status: "active",
LastSeen: time.Now().Unix(),
}
if rc.singleNodeMode {
rc.logger.Debug("Single-node mode: registering node without Raft consensus")
rc.nodes.Store(node.ID, nodeInfo)
rc.fsm.state.mu.Lock()
rc.fsm.state.Nodes[node.ID] = nodeInfo
rc.fsm.state.mu.Unlock()
rc.logger.Debug(fmt.Sprintf("Node registered locally in single-node mode: %s", node.ID))
return nil
}
if !rc.IsLeader() {
leader := rc.GetLeader()
if leader != nil {
rc.logger.Warn(fmt.Sprintf("Current node is not leader. Leader is %s:%d", leader.IP, leader.Port))
return fmt.Errorf("node is not the leader. Please connect to leader at %s:%d", leader.IP, leader.Port)
}
return fmt.Errorf("node is not the leader and no leader found")
}
cmd := NodeRegistrationCommand{
Type: "register",
Node: *nodeInfo,
}
data, err := json.Marshal(cmd)
if err != nil {
return err
}
future := rc.raft.Apply(data, 5*time.Second)
if err := future.Error(); err != nil {
return fmt.Errorf("failed to register node via raft: %v", err)
}
rc.nodes.Store(node.ID, nodeInfo)
rc.logger.Debug(fmt.Sprintf("Node registered via Raft: %s", node.ID))
return nil
}
// RemoveNode удаляет узел через Raft
func (rc *RaftCoordinator) RemoveNode(nodeID string) error {
if rc.singleNodeMode {
rc.nodes.Delete(nodeID)
rc.fsm.state.mu.Lock()
delete(rc.fsm.state.Nodes, nodeID)
rc.fsm.state.mu.Unlock()
rc.logger.Debug(fmt.Sprintf("Node removed locally in single-node mode: %s", nodeID))
return nil
}
if !rc.IsLeader() {
return fmt.Errorf("node is not the leader")
}
cmd := NodeRegistrationCommand{
Type: "remove",
NodeID: nodeID,
}
data, err := json.Marshal(cmd)
if err != nil {
return err
}
future := rc.raft.Apply(data, 5*time.Second)
if err := future.Error(); err != nil {
return fmt.Errorf("failed to remove node via raft: %v", err)
}
rc.nodes.Delete(nodeID)
rc.logger.Debug(fmt.Sprintf("Node removed via Raft: %s", nodeID))
return nil
}
// GetActiveNodes возвращает активные узлы
func (rc *RaftCoordinator) GetActiveNodes() []*NodeInfo {
nodes := make([]*NodeInfo, 0)
now := time.Now().Unix()
state := rc.fsm.state
state.mu.RLock()
defer state.mu.RUnlock()
for _, nodeInfo := range state.Nodes {
if now-nodeInfo.LastSeen < 30 {
nodes = append(nodes, nodeInfo)
}
}
if rc.singleNodeMode && len(nodes) == 0 && rc.localNodeInfo != nil {
nodes = append(nodes, rc.localNodeInfo)
}
return nodes
}
// GetAllNodes возвращает все узлы
func (rc *RaftCoordinator) GetAllNodes() []*NodeInfo {
state := rc.fsm.state
state.mu.RLock()
defer state.mu.RUnlock()
nodes := make([]*NodeInfo, 0, len(state.Nodes))
for _, node := range state.Nodes {
nodes = append(nodes, node)
}
if rc.singleNodeMode && len(nodes) == 0 && rc.localNodeInfo != nil {
nodes = append(nodes, rc.localNodeInfo)
}
return nodes
}
// GetLeader возвращает лидера
func (rc *RaftCoordinator) GetLeader() *NodeInfo {
if rc.singleNodeMode {
return rc.localNodeInfo
}
leaderAddr := rc.raft.Leader()
if leaderAddr == "" {
return nil
}
state := rc.fsm.state
state.mu.RLock()
defer state.mu.RUnlock()
for _, node := range state.Nodes {
nodeAddr := fmt.Sprintf("%s:%d", node.IP, node.Port)
if nodeAddr == string(leaderAddr) {
return node
}
}
return nil
}
// IsLeader проверяет, является ли текущий узел лидером
func (rc *RaftCoordinator) IsLeader() bool {
if rc.singleNodeMode {
return true
}
return rc.isLeader.Load()
}
// SendHeartbeat обновляет heartbeat узла
func (rc *RaftCoordinator) SendHeartbeat(nodeID string) {
if val, ok := rc.nodes.Load(nodeID); ok {
nodeInfo := val.(*NodeInfo)
nodeInfo.LastSeen = time.Now().Unix()
rc.nodes.Store(nodeID, nodeInfo)
}
rc.fsm.state.mu.Lock()
if nodeInfo, ok := rc.fsm.state.Nodes[nodeID]; ok {
nodeInfo.LastSeen = time.Now().Unix()
}
rc.fsm.state.mu.Unlock()
}
// GetClusterStatus возвращает статус кластера
func (rc *RaftCoordinator) GetClusterStatus() *ClusterStatus {
nodes := rc.GetAllNodes()
activeNodes := rc.GetActiveNodes()
syncingNodes := 0
for _, node := range nodes {
if node.Status == "syncing" {
syncingNodes++
}
}
leader := rc.GetLeader()
leaderID := ""
if leader != nil {
leaderID = leader.ID
}
return &ClusterStatus{
Name: rc.clusterName,
TotalNodes: len(nodes),
ActiveNodes: len(activeNodes),
SyncingNodes: syncingNodes,
FailedNodes: len(nodes) - len(activeNodes),
ReplicationFactor: int(rc.replicationFactor.Load()),
LeaderID: leaderID,
Health: rc.calculateHealth(),
}
}
// calculateHealth вычисляет здоровье кластера
func (rc *RaftCoordinator) calculateHealth() string {
activeNodes := rc.GetActiveNodes()
totalNodes := rc.GetAllNodes()
if len(totalNodes) == 0 {
return "critical"
}
ratio := float64(len(activeNodes)) / float64(len(totalNodes))
if ratio >= 0.8 {
return "healthy"
} else if ratio >= 0.5 {
return "degraded"
}
return "critical"
}
// GetReplicationFactor возвращает фактор репликации
func (rc *RaftCoordinator) GetReplicationFactor() int {
return int(rc.replicationFactor.Load())
}
// SetReplicationFactor устанавливает фактор репликации через Raft
func (rc *RaftCoordinator) SetReplicationFactor(factor int) error {
if !rc.IsLeader() {
return fmt.Errorf("node is not the leader")
}
cmd := NodeRegistrationCommand{
Type: "set_replication_factor",
Factor: int32(factor),
}
data, err := json.Marshal(cmd)
if err != nil {
return err
}
future := rc.raft.Apply(data, 5*time.Second)
if err := future.Error(); err != nil {
return fmt.Errorf("failed to set replication factor via raft: %v", err)
}
rc.replicationFactor.Store(int32(factor))
rc.logger.Debug(fmt.Sprintf("Replication factor set to %d via Raft", factor))
return nil
}
// GetClusterHealth возвращает детальную информацию о здоровье кластера
func (rc *RaftCoordinator) GetClusterHealth() *ClusterHealth {
health := &ClusterHealth{
Nodes: make(map[string]*NodeHealth),
OverallScore: 100.0,
Recommendations: "",
}
now := time.Now().Unix()
state := rc.fsm.state
state.mu.RLock()
defer state.mu.RUnlock()
for nodeID, nodeInfo := range state.Nodes {
nodeHealth := &NodeHealth{
Status: nodeInfo.Status,
LatencyMs: 0,
LastCheck: now,
}
if now-nodeInfo.LastSeen > 30 {
nodeHealth.Status = "offline"
health.OverallScore -= 10
} else if nodeInfo.Status == "syncing" {
health.OverallScore -= 5
}
health.Nodes[nodeID] = nodeHealth
}
if health.OverallScore < 50 {
health.Recommendations = "Critical: Check network connectivity and node health immediately"
} else if health.OverallScore < 80 {
health.Recommendations = "Warning: Some nodes are offline or syncing, consider adding more nodes"
} else {
health.Recommendations = "Cluster is healthy, all systems operational"
}
return health
}
// IsReplicationEnabled возвращает статус репликации
func (rc *RaftCoordinator) IsReplicationEnabled() bool {
return rc.replicationEnabled
}
// IsMasterMasterEnabled возвращает статус мастер-мастер репликации
func (rc *RaftCoordinator) IsMasterMasterEnabled() bool {
return rc.masterMasterEnabled
}
// IsSyncReplicationEnabled возвращает статус синхронной репликации
func (rc *RaftCoordinator) IsSyncReplicationEnabled() bool {
return rc.syncReplication
}
// Stop останавливает координатор
func (rc *RaftCoordinator) Stop() {
close(rc.stopChan)
if rc.raft != nil {
rc.raft.Shutdown()
}
rc.logger.Debug("Raft coordinator stopped")
}