337 lines
8.4 KiB
Go
Raw Permalink Normal View History

2026-02-23 22:48:31 +03:00
// /futriis/internal/cluster/node.go
// Пакет cluster реализует управление кластером и репликацию
package cluster
import (
"encoding/json"
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"futriis/pkg/config"
"futriis/pkg/utils"
)
// NodeState состояние узла
type NodeState int
const (
StateOffline NodeState = iota
StateJoining
StateOnline
StateLeaving
StateFailed
)
func (s NodeState) String() string {
switch s {
case StateOffline:
return "offline"
case StateJoining:
return "joining"
case StateOnline:
return "online"
case StateLeaving:
return "leaving"
case StateFailed:
return "failed"
default:
return "unknown"
}
}
// Node представляет узел кластера
type Node struct {
ID string `json:"id"`
Address string `json:"address"`
State NodeState `json:"state"`
LastSeen time.Time `json:"last_seen"`
ShardIDs []string `json:"shard_ids"`
mu sync.RWMutex
}
// ClusterManager управляет кластером
type ClusterManager struct {
nodes map[string]*Node
coordinatorAddr string
nodeID string
localAddr string
isCoordinator bool
mu sync.RWMutex
heartbeatStop chan struct{}
stats struct {
totalNodes int64
activeNodes int64
rebalancingCnt int64
}
}
// NewClusterManager создаёт новый менеджер кластера
func NewClusterManager(cfg *config.Config) *ClusterManager {
cm := &ClusterManager{
nodes: make(map[string]*Node),
coordinatorAddr: cfg.Cluster.CoordinatorAddress,
nodeID: cfg.Node.ID,
localAddr: cfg.Node.Address,
isCoordinator: cfg.Node.Address == cfg.Cluster.CoordinatorAddress,
heartbeatStop: make(chan struct{}),
}
// Добавляем себя в кластер
cm.addNode(&Node{
ID: cfg.Node.ID,
Address: cfg.Node.Address,
State: StateOnline,
LastSeen: time.Now(),
})
return cm
}
// Start запускает кластерные сервисы
func (cm *ClusterManager) Start() error {
if cm.isCoordinator {
// Запускаем координатор
go cm.startCoordinator()
}
// Запускаем heartbeat
go cm.heartbeatLoop()
utils.PrintInfo("Кластерный менеджер запущен")
return nil
}
// Stop останавливает кластерные сервисы
func (cm *ClusterManager) Stop() {
close(cm.heartbeatStop)
}
// heartbeatLoop отправляет heartbeat сигналы
func (cm *ClusterManager) heartbeatLoop() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cm.sendHeartbeat()
case <-cm.heartbeatStop:
return
}
}
}
// sendHeartbeat отправляет heartbeat координатору
func (cm *ClusterManager) sendHeartbeat() {
if !cm.isCoordinator {
// Отправляем heartbeat координатору
conn, err := net.DialTimeout("tcp", cm.coordinatorAddr, 3*time.Second)
if err != nil {
return
}
defer conn.Close()
heartbeat := map[string]interface{}{
"type": "heartbeat",
"node_id": cm.nodeID,
"address": cm.localAddr,
"time": time.Now().Unix(),
}
json.NewEncoder(conn).Encode(heartbeat)
}
}
// startCoordinator запускает координатор кластера
func (cm *ClusterManager) startCoordinator() {
listener, err := net.Listen("tcp", cm.coordinatorAddr)
if err != nil {
utils.PrintError("Ошибка запуска координатора: %v", err)
return
}
defer listener.Close()
utils.PrintInfo("Координатор кластера запущен на " + cm.coordinatorAddr)
for {
conn, err := listener.Accept()
if err != nil {
continue
}
go cm.handleCoordinatorRequest(conn)
}
}
// handleCoordinatorRequest обрабатывает запросы к координатору
func (cm *ClusterManager) handleCoordinatorRequest(conn net.Conn) {
defer conn.Close()
var req map[string]interface{}
if err := json.NewDecoder(conn).Decode(&req); err != nil {
return
}
msgType, _ := req["type"].(string)
switch msgType {
case "heartbeat":
nodeID, _ := req["node_id"].(string)
address, _ := req["address"].(string)
cm.updateNodeHeartbeat(nodeID, address)
case "join":
nodeID, _ := req["node_id"].(string)
address, _ := req["address"].(string)
cm.handleNodeJoin(nodeID, address)
case "leave":
nodeID, _ := req["node_id"].(string)
cm.handleNodeLeave(nodeID)
}
}
// handleNodeJoin обрабатывает присоединение узла
func (cm *ClusterManager) handleNodeJoin(nodeID, address string) {
cm.mu.Lock()
defer cm.mu.Unlock()
if node, exists := cm.nodes[nodeID]; exists {
node.State = StateOnline
node.LastSeen = time.Now()
} else {
cm.nodes[nodeID] = &Node{
ID: nodeID,
Address: address,
State: StateOnline,
LastSeen: time.Now(),
}
atomic.AddInt64(&cm.stats.totalNodes, 1)
}
atomic.AddInt64(&cm.stats.activeNodes, 1)
utils.PrintSuccess("Узел %s (%s) присоединился к кластеру", nodeID, address)
}
// handleNodeLeave обрабатывает отключение узла
func (cm *ClusterManager) handleNodeLeave(nodeID string) {
cm.mu.Lock()
defer cm.mu.Unlock()
if node, exists := cm.nodes[nodeID]; exists {
node.State = StateOffline
atomic.AddInt64(&cm.stats.activeNodes, -1)
utils.PrintWarning("Узел %s покинул кластер", nodeID)
}
}
// updateNodeHeartbeat обновляет время последнего heartbeat узла
func (cm *ClusterManager) updateNodeHeartbeat(nodeID, address string) {
cm.mu.Lock()
defer cm.mu.Unlock()
if node, exists := cm.nodes[nodeID]; exists {
node.LastSeen = time.Now()
if node.State == StateOffline {
node.State = StateOnline
atomic.AddInt64(&cm.stats.activeNodes, 1)
}
} else {
cm.nodes[nodeID] = &Node{
ID: nodeID,
Address: address,
State: StateOnline,
LastSeen: time.Now(),
}
atomic.AddInt64(&cm.stats.totalNodes, 1)
atomic.AddInt64(&cm.stats.activeNodes, 1)
}
}
// AddNode добавляет новый узел в кластер
func (cm *ClusterManager) AddNode(address string) error {
if !cm.isCoordinator {
return errors.New("только координатор может добавлять узлы")
}
cm.mu.Lock()
defer cm.mu.Unlock()
// Генерируем ID для нового узла
nodeID := fmt.Sprintf("node-%d", len(cm.nodes)+1)
cm.nodes[nodeID] = &Node{
ID: nodeID,
Address: address,
State: StateJoining,
LastSeen: time.Now(),
}
atomic.AddInt64(&cm.stats.totalNodes, 1)
utils.PrintSuccess("Узел %s (%s) добавлен в кластер", nodeID, address)
return nil
}
// RemoveNode удаляет узел из кластера
func (cm *ClusterManager) RemoveNode(nodeID string) error {
if !cm.isCoordinator {
return errors.New("только координатор может удалять узлы")
}
cm.mu.Lock()
defer cm.mu.Unlock()
if node, exists := cm.nodes[nodeID]; exists {
node.State = StateLeaving
delete(cm.nodes, nodeID)
atomic.AddInt64(&cm.stats.totalNodes, -1)
if node.State == StateOnline {
atomic.AddInt64(&cm.stats.activeNodes, -1)
}
utils.PrintSuccess("Узел %s удален из кластера", nodeID)
return nil
}
return errors.New("узел не найден")
}
// GetClusterStatus возвращает статус кластера
func (cm *ClusterManager) GetClusterStatus() map[string]interface{} {
cm.mu.RLock()
defer cm.mu.RUnlock()
status := make(map[string]interface{})
status["total_nodes"] = atomic.LoadInt64(&cm.stats.totalNodes)
status["active_nodes"] = atomic.LoadInt64(&cm.stats.activeNodes)
status["is_coordinator"] = cm.isCoordinator
status["coordinator"] = cm.coordinatorAddr
nodes := make([]map[string]interface{}, 0, len(cm.nodes))
for id, node := range cm.nodes {
nodes = append(nodes, map[string]interface{}{
"id": id,
"address": node.Address,
"state": node.State.String(),
"last_seen": node.LastSeen.Format(time.RFC3339),
})
}
status["nodes"] = nodes
return status
}
// addNode добавляет узел во внутреннюю карту
func (cm *ClusterManager) addNode(node *Node) {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.nodes[node.ID] = node
atomic.AddInt64(&cm.stats.totalNodes, 1)
if node.State == StateOnline {
atomic.AddInt64(&cm.stats.activeNodes, 1)
}
}