337 lines
8.4 KiB
Go
337 lines
8.4 KiB
Go
// /futriis/internal/cluster/node.go
|
|
// Пакет cluster реализует управление кластером и репликацию
|
|
package cluster
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"futriis/pkg/config"
|
|
"futriis/pkg/utils"
|
|
)
|
|
// NodeState состояние узла
|
|
type NodeState int
|
|
|
|
const (
|
|
StateOffline NodeState = iota
|
|
StateJoining
|
|
StateOnline
|
|
StateLeaving
|
|
StateFailed
|
|
)
|
|
|
|
func (s NodeState) String() string {
|
|
switch s {
|
|
case StateOffline:
|
|
return "offline"
|
|
case StateJoining:
|
|
return "joining"
|
|
case StateOnline:
|
|
return "online"
|
|
case StateLeaving:
|
|
return "leaving"
|
|
case StateFailed:
|
|
return "failed"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
// Node представляет узел кластера
|
|
type Node struct {
|
|
ID string `json:"id"`
|
|
Address string `json:"address"`
|
|
State NodeState `json:"state"`
|
|
LastSeen time.Time `json:"last_seen"`
|
|
ShardIDs []string `json:"shard_ids"`
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// ClusterManager управляет кластером
|
|
type ClusterManager struct {
|
|
nodes map[string]*Node
|
|
coordinatorAddr string
|
|
nodeID string
|
|
localAddr string
|
|
isCoordinator bool
|
|
mu sync.RWMutex
|
|
heartbeatStop chan struct{}
|
|
stats struct {
|
|
totalNodes int64
|
|
activeNodes int64
|
|
rebalancingCnt int64
|
|
}
|
|
}
|
|
|
|
// NewClusterManager создаёт новый менеджер кластера
|
|
func NewClusterManager(cfg *config.Config) *ClusterManager {
|
|
cm := &ClusterManager{
|
|
nodes: make(map[string]*Node),
|
|
coordinatorAddr: cfg.Cluster.CoordinatorAddress,
|
|
nodeID: cfg.Node.ID,
|
|
localAddr: cfg.Node.Address,
|
|
isCoordinator: cfg.Node.Address == cfg.Cluster.CoordinatorAddress,
|
|
heartbeatStop: make(chan struct{}),
|
|
}
|
|
|
|
// Добавляем себя в кластер
|
|
cm.addNode(&Node{
|
|
ID: cfg.Node.ID,
|
|
Address: cfg.Node.Address,
|
|
State: StateOnline,
|
|
LastSeen: time.Now(),
|
|
})
|
|
|
|
return cm
|
|
}
|
|
|
|
// Start запускает кластерные сервисы
|
|
func (cm *ClusterManager) Start() error {
|
|
if cm.isCoordinator {
|
|
// Запускаем координатор
|
|
go cm.startCoordinator()
|
|
}
|
|
|
|
// Запускаем heartbeat
|
|
go cm.heartbeatLoop()
|
|
|
|
utils.PrintInfo("Кластерный менеджер запущен")
|
|
return nil
|
|
}
|
|
|
|
// Stop останавливает кластерные сервисы
|
|
func (cm *ClusterManager) Stop() {
|
|
close(cm.heartbeatStop)
|
|
}
|
|
|
|
// heartbeatLoop отправляет heartbeat сигналы
|
|
func (cm *ClusterManager) heartbeatLoop() {
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
cm.sendHeartbeat()
|
|
case <-cm.heartbeatStop:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendHeartbeat отправляет heartbeat координатору
|
|
func (cm *ClusterManager) sendHeartbeat() {
|
|
if !cm.isCoordinator {
|
|
// Отправляем heartbeat координатору
|
|
conn, err := net.DialTimeout("tcp", cm.coordinatorAddr, 3*time.Second)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer conn.Close()
|
|
|
|
heartbeat := map[string]interface{}{
|
|
"type": "heartbeat",
|
|
"node_id": cm.nodeID,
|
|
"address": cm.localAddr,
|
|
"time": time.Now().Unix(),
|
|
}
|
|
|
|
json.NewEncoder(conn).Encode(heartbeat)
|
|
}
|
|
}
|
|
|
|
// startCoordinator запускает координатор кластера
|
|
func (cm *ClusterManager) startCoordinator() {
|
|
listener, err := net.Listen("tcp", cm.coordinatorAddr)
|
|
if err != nil {
|
|
utils.PrintError("Ошибка запуска координатора: %v", err)
|
|
return
|
|
}
|
|
defer listener.Close()
|
|
|
|
utils.PrintInfo("Координатор кластера запущен на " + cm.coordinatorAddr)
|
|
|
|
for {
|
|
conn, err := listener.Accept()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
go cm.handleCoordinatorRequest(conn)
|
|
}
|
|
}
|
|
|
|
// handleCoordinatorRequest обрабатывает запросы к координатору
|
|
func (cm *ClusterManager) handleCoordinatorRequest(conn net.Conn) {
|
|
defer conn.Close()
|
|
|
|
var req map[string]interface{}
|
|
if err := json.NewDecoder(conn).Decode(&req); err != nil {
|
|
return
|
|
}
|
|
|
|
msgType, _ := req["type"].(string)
|
|
|
|
switch msgType {
|
|
case "heartbeat":
|
|
nodeID, _ := req["node_id"].(string)
|
|
address, _ := req["address"].(string)
|
|
cm.updateNodeHeartbeat(nodeID, address)
|
|
|
|
case "join":
|
|
nodeID, _ := req["node_id"].(string)
|
|
address, _ := req["address"].(string)
|
|
cm.handleNodeJoin(nodeID, address)
|
|
|
|
case "leave":
|
|
nodeID, _ := req["node_id"].(string)
|
|
cm.handleNodeLeave(nodeID)
|
|
}
|
|
}
|
|
|
|
// handleNodeJoin обрабатывает присоединение узла
|
|
func (cm *ClusterManager) handleNodeJoin(nodeID, address string) {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
if node, exists := cm.nodes[nodeID]; exists {
|
|
node.State = StateOnline
|
|
node.LastSeen = time.Now()
|
|
} else {
|
|
cm.nodes[nodeID] = &Node{
|
|
ID: nodeID,
|
|
Address: address,
|
|
State: StateOnline,
|
|
LastSeen: time.Now(),
|
|
}
|
|
atomic.AddInt64(&cm.stats.totalNodes, 1)
|
|
}
|
|
|
|
atomic.AddInt64(&cm.stats.activeNodes, 1)
|
|
utils.PrintSuccess("Узел %s (%s) присоединился к кластеру", nodeID, address)
|
|
}
|
|
|
|
// handleNodeLeave обрабатывает отключение узла
|
|
func (cm *ClusterManager) handleNodeLeave(nodeID string) {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
if node, exists := cm.nodes[nodeID]; exists {
|
|
node.State = StateOffline
|
|
atomic.AddInt64(&cm.stats.activeNodes, -1)
|
|
utils.PrintWarning("Узел %s покинул кластер", nodeID)
|
|
}
|
|
}
|
|
|
|
// updateNodeHeartbeat обновляет время последнего heartbeat узла
|
|
func (cm *ClusterManager) updateNodeHeartbeat(nodeID, address string) {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
if node, exists := cm.nodes[nodeID]; exists {
|
|
node.LastSeen = time.Now()
|
|
if node.State == StateOffline {
|
|
node.State = StateOnline
|
|
atomic.AddInt64(&cm.stats.activeNodes, 1)
|
|
}
|
|
} else {
|
|
cm.nodes[nodeID] = &Node{
|
|
ID: nodeID,
|
|
Address: address,
|
|
State: StateOnline,
|
|
LastSeen: time.Now(),
|
|
}
|
|
atomic.AddInt64(&cm.stats.totalNodes, 1)
|
|
atomic.AddInt64(&cm.stats.activeNodes, 1)
|
|
}
|
|
}
|
|
|
|
// AddNode добавляет новый узел в кластер
|
|
func (cm *ClusterManager) AddNode(address string) error {
|
|
if !cm.isCoordinator {
|
|
return errors.New("только координатор может добавлять узлы")
|
|
}
|
|
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
// Генерируем ID для нового узла
|
|
nodeID := fmt.Sprintf("node-%d", len(cm.nodes)+1)
|
|
|
|
cm.nodes[nodeID] = &Node{
|
|
ID: nodeID,
|
|
Address: address,
|
|
State: StateJoining,
|
|
LastSeen: time.Now(),
|
|
}
|
|
|
|
atomic.AddInt64(&cm.stats.totalNodes, 1)
|
|
|
|
utils.PrintSuccess("Узел %s (%s) добавлен в кластер", nodeID, address)
|
|
return nil
|
|
}
|
|
|
|
// RemoveNode удаляет узел из кластера
|
|
func (cm *ClusterManager) RemoveNode(nodeID string) error {
|
|
if !cm.isCoordinator {
|
|
return errors.New("только координатор может удалять узлы")
|
|
}
|
|
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
if node, exists := cm.nodes[nodeID]; exists {
|
|
node.State = StateLeaving
|
|
delete(cm.nodes, nodeID)
|
|
atomic.AddInt64(&cm.stats.totalNodes, -1)
|
|
if node.State == StateOnline {
|
|
atomic.AddInt64(&cm.stats.activeNodes, -1)
|
|
}
|
|
utils.PrintSuccess("Узел %s удален из кластера", nodeID)
|
|
return nil
|
|
}
|
|
|
|
return errors.New("узел не найден")
|
|
}
|
|
|
|
// GetClusterStatus возвращает статус кластера
|
|
func (cm *ClusterManager) GetClusterStatus() map[string]interface{} {
|
|
cm.mu.RLock()
|
|
defer cm.mu.RUnlock()
|
|
|
|
status := make(map[string]interface{})
|
|
status["total_nodes"] = atomic.LoadInt64(&cm.stats.totalNodes)
|
|
status["active_nodes"] = atomic.LoadInt64(&cm.stats.activeNodes)
|
|
status["is_coordinator"] = cm.isCoordinator
|
|
status["coordinator"] = cm.coordinatorAddr
|
|
|
|
nodes := make([]map[string]interface{}, 0, len(cm.nodes))
|
|
for id, node := range cm.nodes {
|
|
nodes = append(nodes, map[string]interface{}{
|
|
"id": id,
|
|
"address": node.Address,
|
|
"state": node.State.String(),
|
|
"last_seen": node.LastSeen.Format(time.RFC3339),
|
|
})
|
|
}
|
|
status["nodes"] = nodes
|
|
|
|
return status
|
|
}
|
|
|
|
// addNode добавляет узел во внутреннюю карту
|
|
func (cm *ClusterManager) addNode(node *Node) {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
cm.nodes[node.ID] = node
|
|
atomic.AddInt64(&cm.stats.totalNodes, 1)
|
|
if node.State == StateOnline {
|
|
atomic.AddInt64(&cm.stats.activeNodes, 1)
|
|
}
|
|
}
|