811 lines
24 KiB
Go
811 lines
24 KiB
Go
|
|
// /futriis/internal/cluster/node.go
|
|||
|
|
// Пакет cluster реализует управление кластером, координацию узлов и репликацию данных.
|
|||
|
|
// Обеспечивает обнаружение узлов, heartbeat механизм для мониторинга доступности,
|
|||
|
|
// а также синхронную мастер-мастер репликацию между узлами кластера.
|
|||
|
|
// Поддерживает автоматическое переключение ролей и балансировку нагрузки.
|
|||
|
|
|
|||
|
|
package cluster
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"encoding/json"
|
|||
|
|
"errors"
|
|||
|
|
"fmt"
|
|||
|
|
"net"
|
|||
|
|
"sync"
|
|||
|
|
"sync/atomic"
|
|||
|
|
"time"
|
|||
|
|
"unsafe"
|
|||
|
|
|
|||
|
|
"futriis/pkg/config"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// Простые функции для логирования (без зависимостей)
|
|||
|
|
func printInfo(format string, args ...interface{}) {
|
|||
|
|
fmt.Printf("\033[34m[INFO]\033[0m %s\n", fmt.Sprintf(format, args...))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func printSuccess(format string, args ...interface{}) {
|
|||
|
|
fmt.Printf("\033[32m[OK]\033[0m %s\n", fmt.Sprintf(format, args...))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func printWarning(format string, args ...interface{}) {
|
|||
|
|
fmt.Printf("\033[33m[WARN]\033[0m %s\n", fmt.Sprintf(format, args...))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func printError(format string, args ...interface{}) {
|
|||
|
|
fmt.Printf("\033[31m[ERROR]\033[0m %s\n", fmt.Sprintf(format, args...))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// NodeState состояние узла
|
|||
|
|
type NodeState int
|
|||
|
|
|
|||
|
|
const (
|
|||
|
|
StateOffline NodeState = iota
|
|||
|
|
StateJoining
|
|||
|
|
StateOnline
|
|||
|
|
StateLeaving
|
|||
|
|
StateFailed
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
func (s NodeState) String() string {
|
|||
|
|
switch s {
|
|||
|
|
case StateOffline:
|
|||
|
|
return "offline"
|
|||
|
|
case StateJoining:
|
|||
|
|
return "joining"
|
|||
|
|
case StateOnline:
|
|||
|
|
return "online"
|
|||
|
|
case StateLeaving:
|
|||
|
|
return "leaving"
|
|||
|
|
case StateFailed:
|
|||
|
|
return "failed"
|
|||
|
|
default:
|
|||
|
|
return "unknown"
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ReplicationMessage представляет сообщение репликации
|
|||
|
|
type ReplicationMessage struct {
|
|||
|
|
Type string `json:"type"` // "write", "sync", "ack"
|
|||
|
|
Command string `json:"command"` // Команда (create, update, delete)
|
|||
|
|
Args []interface{} `json:"args"` // Аргументы команды
|
|||
|
|
Timestamp int64 `json:"timestamp"` // Временная метка
|
|||
|
|
NodeID string `json:"node_id"` // ID исходного узла
|
|||
|
|
ShardID string `json:"shard_id"` // ID шарда
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Node представляет узел кластера (с wait-free указателями)
|
|||
|
|
type Node struct {
|
|||
|
|
ID string
|
|||
|
|
Address string
|
|||
|
|
state int32 // Атомарное состояние
|
|||
|
|
LastSeen time.Time
|
|||
|
|
ShardIDs []string
|
|||
|
|
// Используем atomic.Value для wait-free доступа к изменяемым полям
|
|||
|
|
nodeState unsafe.Pointer // Атомарный указатель на карту состояний
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetState атомарно получает состояние узла
|
|||
|
|
func (n *Node) GetState() NodeState {
|
|||
|
|
return NodeState(atomic.LoadInt32(&n.state))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// SetState атомарно устанавливает состояние узла
|
|||
|
|
func (n *Node) SetState(state NodeState) {
|
|||
|
|
atomic.StoreInt32(&n.state, int32(state))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ClusterManager управляет кластером (с wait-free операциями)
|
|||
|
|
type ClusterManager struct {
|
|||
|
|
nodes unsafe.Pointer // Атомарный указатель на карту узлов
|
|||
|
|
coordinatorAddr string
|
|||
|
|
nodeID string
|
|||
|
|
localAddr string
|
|||
|
|
isCoordinator int32 // Атомарный флаг
|
|||
|
|
heartbeatStop chan struct{}
|
|||
|
|
replicationStop chan struct{}
|
|||
|
|
stats struct {
|
|||
|
|
totalNodes int64
|
|||
|
|
activeNodes int64
|
|||
|
|
rebalancingCnt int64
|
|||
|
|
}
|
|||
|
|
replicationEnabled bool
|
|||
|
|
masterMaster bool
|
|||
|
|
replicationQueue chan ReplicationMessage
|
|||
|
|
replicationWG sync.WaitGroup
|
|||
|
|
shardManager *ShardManager // Менеджер шардинга
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// NewClusterManager создаёт новый менеджер кластера
|
|||
|
|
func NewClusterManager(cfg *config.Config) *ClusterManager {
|
|||
|
|
// Определяем стратегию шардинга по умолчанию
|
|||
|
|
shardingStrategy := ConsistentHashing
|
|||
|
|
shardingEnabled := false
|
|||
|
|
initialShards := 10
|
|||
|
|
|
|||
|
|
// Проверяем наличие настроек шардинга в конфигурации
|
|||
|
|
// Используем значения по умолчанию, если поля отсутствуют
|
|||
|
|
|
|||
|
|
cm := &ClusterManager{
|
|||
|
|
coordinatorAddr: cfg.Cluster.CoordinatorAddress,
|
|||
|
|
nodeID: cfg.Node.ID,
|
|||
|
|
localAddr: cfg.Node.Address,
|
|||
|
|
heartbeatStop: make(chan struct{}),
|
|||
|
|
replicationStop: make(chan struct{}),
|
|||
|
|
replicationEnabled: cfg.Replication.Enabled,
|
|||
|
|
masterMaster: cfg.Replication.MasterMaster,
|
|||
|
|
replicationQueue: make(chan ReplicationMessage, 1000),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Создаём менеджер шардинга с значениями по умолчанию
|
|||
|
|
cm.shardManager = NewShardManager(
|
|||
|
|
shardingStrategy,
|
|||
|
|
cfg.Cluster.ReplicationFactor,
|
|||
|
|
shardingEnabled,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// Устанавливаем флаг координатора атомарно
|
|||
|
|
isCoord := int32(0)
|
|||
|
|
if cfg.Node.Address == cfg.Cluster.CoordinatorAddress {
|
|||
|
|
isCoord = 1
|
|||
|
|
}
|
|||
|
|
atomic.StoreInt32(&cm.isCoordinator, isCoord)
|
|||
|
|
|
|||
|
|
// Создаём начальную карту узлов
|
|||
|
|
nodes := make(map[string]*Node)
|
|||
|
|
|
|||
|
|
// Добавляем себя в кластер
|
|||
|
|
selfNode := &Node{
|
|||
|
|
ID: cfg.Node.ID,
|
|||
|
|
Address: cfg.Node.Address,
|
|||
|
|
LastSeen: time.Now(),
|
|||
|
|
}
|
|||
|
|
selfNode.SetState(StateOnline)
|
|||
|
|
nodes[cfg.Node.ID] = selfNode
|
|||
|
|
|
|||
|
|
// Атомарно сохраняем карту узлов
|
|||
|
|
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&nodes))
|
|||
|
|
|
|||
|
|
atomic.AddInt64(&cm.stats.totalNodes, 1)
|
|||
|
|
atomic.AddInt64(&cm.stats.activeNodes, 1)
|
|||
|
|
|
|||
|
|
// Создаём начальные шарды если включён шардинг
|
|||
|
|
if shardingEnabled {
|
|||
|
|
// Получаем список всех узлов
|
|||
|
|
allNodes := make([]string, 0)
|
|||
|
|
for id := range nodes {
|
|||
|
|
allNodes = append(allNodes, id)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Создаём шарды
|
|||
|
|
cm.shardManager.CreateShards(initialShards, allNodes)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Запускаем обработчик репликации если включена мастер-мастер
|
|||
|
|
if cm.replicationEnabled && cm.masterMaster {
|
|||
|
|
go cm.startReplicationHandler()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return cm
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// IsCoordinator атомарно проверяет, является ли узел координатором
|
|||
|
|
func (cm *ClusterManager) IsCoordinator() bool {
|
|||
|
|
return atomic.LoadInt32(&cm.isCoordinator) == 1
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Start запускает кластерные сервисы
|
|||
|
|
func (cm *ClusterManager) Start() error {
|
|||
|
|
if cm.IsCoordinator() {
|
|||
|
|
// Запускаем координатор
|
|||
|
|
go cm.startCoordinator()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Запускаем heartbeat
|
|||
|
|
go cm.heartbeatLoop()
|
|||
|
|
|
|||
|
|
printInfo("Кластерный менеджер запущен")
|
|||
|
|
if cm.replicationEnabled && cm.masterMaster {
|
|||
|
|
printInfo("Мастер-мастер репликация активирована")
|
|||
|
|
}
|
|||
|
|
if cm.shardManager.enabled {
|
|||
|
|
printInfo("Шардинг активирован, стратегия: %v", cm.shardManager.getStrategyName())
|
|||
|
|
}
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Stop останавливает кластерные сервисы
|
|||
|
|
func (cm *ClusterManager) Stop() {
|
|||
|
|
close(cm.heartbeatStop)
|
|||
|
|
if cm.replicationEnabled && cm.masterMaster {
|
|||
|
|
close(cm.replicationStop)
|
|||
|
|
cm.replicationWG.Wait()
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ReplicateCommand реплицирует команду на все узлы кластера с учётом шардинга
|
|||
|
|
func (cm *ClusterManager) ReplicateCommand(cmd string, args []interface{}, key string) error {
|
|||
|
|
if !cm.replicationEnabled || !cm.masterMaster {
|
|||
|
|
return nil // Репликация не включена
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Определяем шард для ключа
|
|||
|
|
var shardID string
|
|||
|
|
if cm.shardManager.enabled && key != "" {
|
|||
|
|
shard, err := cm.shardManager.GetShardForKey(key)
|
|||
|
|
if err == nil && shard != nil {
|
|||
|
|
shardID = shard.ID
|
|||
|
|
cm.shardManager.RecordWrite(shardID)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
msg := ReplicationMessage{
|
|||
|
|
Type: "write",
|
|||
|
|
Command: cmd,
|
|||
|
|
Args: args,
|
|||
|
|
Timestamp: time.Now().UnixNano(),
|
|||
|
|
NodeID: cm.nodeID,
|
|||
|
|
ShardID: shardID,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Отправляем в очередь репликации
|
|||
|
|
select {
|
|||
|
|
case cm.replicationQueue <- msg:
|
|||
|
|
return nil
|
|||
|
|
default:
|
|||
|
|
return errors.New("очередь репликации переполнена")
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// startReplicationHandler запускает обработчик репликации
|
|||
|
|
func (cm *ClusterManager) startReplicationHandler() {
|
|||
|
|
cm.replicationWG.Add(1)
|
|||
|
|
defer cm.replicationWG.Done()
|
|||
|
|
|
|||
|
|
ticker := time.NewTicker(1 * time.Second)
|
|||
|
|
defer ticker.Stop()
|
|||
|
|
|
|||
|
|
for {
|
|||
|
|
select {
|
|||
|
|
case msg := <-cm.replicationQueue:
|
|||
|
|
cm.sendReplicationMessage(msg)
|
|||
|
|
case <-ticker.C:
|
|||
|
|
cm.sendSyncRequest()
|
|||
|
|
case <-cm.replicationStop:
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// sendReplicationMessage отправляет сообщение репликации на другие узлы
|
|||
|
|
func (cm *ClusterManager) sendReplicationMessage(msg ReplicationMessage) {
|
|||
|
|
// Получаем текущую карту узлов атомарно
|
|||
|
|
nodesPtr := atomic.LoadPointer(&cm.nodes)
|
|||
|
|
if nodesPtr == nil {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
nodes := *(*map[string]*Node)(nodesPtr)
|
|||
|
|
|
|||
|
|
for id, node := range nodes {
|
|||
|
|
if id == cm.nodeID || node.GetState() != StateOnline {
|
|||
|
|
continue // Не отправляем себе и офлайн узлам
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Если есть шард, проверяем, должен ли узел получать это сообщение
|
|||
|
|
if msg.ShardID != "" && cm.shardManager.enabled {
|
|||
|
|
shard, exists := cm.shardManager.GetShardByID(msg.ShardID)
|
|||
|
|
if exists {
|
|||
|
|
shouldSend := false
|
|||
|
|
for _, nodeID := range shard.Nodes {
|
|||
|
|
if nodeID == id {
|
|||
|
|
shouldSend = true
|
|||
|
|
break
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
if !shouldSend {
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
go func(targetNode *Node) {
|
|||
|
|
conn, err := net.DialTimeout("tcp", targetNode.Address, 3*time.Second)
|
|||
|
|
if err != nil {
|
|||
|
|
printWarning("Не удалось подключиться к узлу %s для репликации: %v", targetNode.ID, err)
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
defer conn.Close()
|
|||
|
|
|
|||
|
|
encoder := json.NewEncoder(conn)
|
|||
|
|
if err := encoder.Encode(msg); err != nil {
|
|||
|
|
printWarning("Ошибка отправки репликации на узел %s: %v", targetNode.ID, err)
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Ожидаем подтверждение для синхронной репликации
|
|||
|
|
if msg.Type == "write" {
|
|||
|
|
var ack map[string]interface{}
|
|||
|
|
decoder := json.NewDecoder(conn)
|
|||
|
|
if err := decoder.Decode(&ack); err == nil {
|
|||
|
|
if status, ok := ack["status"].(string); ok && status == "ok" {
|
|||
|
|
printSuccess("Репликация на узел %s подтверждена", targetNode.ID)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}(node)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// sendSyncRequest отправляет запрос синхронизации
|
|||
|
|
func (cm *ClusterManager) sendSyncRequest() {
|
|||
|
|
if !cm.IsCoordinator() {
|
|||
|
|
return // Только координатор инициирует синхронизацию
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
msg := ReplicationMessage{
|
|||
|
|
Type: "sync",
|
|||
|
|
Timestamp: time.Now().UnixNano(),
|
|||
|
|
NodeID: cm.nodeID,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
cm.sendReplicationMessage(msg)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// handleReplicationMessage обрабатывает входящее сообщение репликации
|
|||
|
|
func (cm *ClusterManager) handleReplicationMessage(conn net.Conn) {
|
|||
|
|
defer conn.Close()
|
|||
|
|
|
|||
|
|
var msg ReplicationMessage
|
|||
|
|
decoder := json.NewDecoder(conn)
|
|||
|
|
if err := decoder.Decode(&msg); err != nil {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
switch msg.Type {
|
|||
|
|
case "write":
|
|||
|
|
// Получаем команду от другого узла
|
|||
|
|
printInfo("Получена команда репликации: %s от узла %s (шард: %s)",
|
|||
|
|
msg.Command, msg.NodeID, msg.ShardID)
|
|||
|
|
|
|||
|
|
// Здесь будет вызов движка для выполнения команды
|
|||
|
|
// TODO: Интегрировать с engine для выполнения реплицированных команд
|
|||
|
|
|
|||
|
|
// Отправляем подтверждение
|
|||
|
|
encoder := json.NewEncoder(conn)
|
|||
|
|
encoder.Encode(map[string]interface{}{
|
|||
|
|
"status": "ok",
|
|||
|
|
"time": time.Now().UnixNano(),
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
case "sync":
|
|||
|
|
// Запрос синхронизации
|
|||
|
|
printInfo("Получен запрос синхронизации от узла %s", msg.NodeID)
|
|||
|
|
// TODO: Отправить текущее состояние
|
|||
|
|
|
|||
|
|
case "ack":
|
|||
|
|
// Подтверждение получения
|
|||
|
|
printInfo("Получено подтверждение от узла %s", msg.NodeID)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// heartbeatLoop отправляет heartbeat сигналы
|
|||
|
|
func (cm *ClusterManager) heartbeatLoop() {
|
|||
|
|
ticker := time.NewTicker(5 * time.Second)
|
|||
|
|
defer ticker.Stop()
|
|||
|
|
|
|||
|
|
for {
|
|||
|
|
select {
|
|||
|
|
case <-ticker.C:
|
|||
|
|
cm.sendHeartbeat()
|
|||
|
|
case <-cm.heartbeatStop:
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// sendHeartbeat отправляет heartbeat координатору
|
|||
|
|
func (cm *ClusterManager) sendHeartbeat() {
|
|||
|
|
if !cm.IsCoordinator() {
|
|||
|
|
// Отправляем heartbeat координатору
|
|||
|
|
conn, err := net.DialTimeout("tcp", cm.coordinatorAddr, 3*time.Second)
|
|||
|
|
if err != nil {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
defer conn.Close()
|
|||
|
|
|
|||
|
|
heartbeat := map[string]interface{}{
|
|||
|
|
"type": "heartbeat",
|
|||
|
|
"node_id": cm.nodeID,
|
|||
|
|
"address": cm.localAddr,
|
|||
|
|
"time": time.Now().Unix(),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
json.NewEncoder(conn).Encode(heartbeat)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// startCoordinator запускает координатор кластера
|
|||
|
|
func (cm *ClusterManager) startCoordinator() {
|
|||
|
|
listener, err := net.Listen("tcp", cm.coordinatorAddr)
|
|||
|
|
if err != nil {
|
|||
|
|
printError("Ошибка запуска координатора: %v", err)
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
defer listener.Close()
|
|||
|
|
|
|||
|
|
printInfo("Координатор кластера запущен на " + cm.coordinatorAddr)
|
|||
|
|
|
|||
|
|
for {
|
|||
|
|
conn, err := listener.Accept()
|
|||
|
|
if err != nil {
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
go cm.handleCoordinatorRequest(conn)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// handleCoordinatorRequest обрабатывает запросы к координатору
|
|||
|
|
func (cm *ClusterManager) handleCoordinatorRequest(conn net.Conn) {
|
|||
|
|
defer conn.Close()
|
|||
|
|
|
|||
|
|
var req map[string]interface{}
|
|||
|
|
if err := json.NewDecoder(conn).Decode(&req); err != nil {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
msgType, _ := req["type"].(string)
|
|||
|
|
|
|||
|
|
switch msgType {
|
|||
|
|
case "heartbeat":
|
|||
|
|
nodeID, _ := req["node_id"].(string)
|
|||
|
|
address, _ := req["address"].(string)
|
|||
|
|
cm.updateNodeHeartbeat(nodeID, address)
|
|||
|
|
|
|||
|
|
case "join":
|
|||
|
|
nodeID, _ := req["node_id"].(string)
|
|||
|
|
address, _ := req["address"].(string)
|
|||
|
|
cm.handleNodeJoin(nodeID, address)
|
|||
|
|
|
|||
|
|
case "leave":
|
|||
|
|
nodeID, _ := req["node_id"].(string)
|
|||
|
|
cm.handleNodeLeave(nodeID)
|
|||
|
|
|
|||
|
|
case "replication":
|
|||
|
|
// Обработка сообщения репликации на координаторе
|
|||
|
|
cm.handleReplicationMessage(conn)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// handleNodeJoin обрабатывает присоединение узла
|
|||
|
|
func (cm *ClusterManager) handleNodeJoin(nodeID, address string) {
|
|||
|
|
// Получаем текущую карту узлов
|
|||
|
|
nodesPtr := atomic.LoadPointer(&cm.nodes)
|
|||
|
|
if nodesPtr == nil {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
oldNodes := *(*map[string]*Node)(nodesPtr)
|
|||
|
|
|
|||
|
|
// Создаём новую карту
|
|||
|
|
newNodes := make(map[string]*Node)
|
|||
|
|
for k, v := range oldNodes {
|
|||
|
|
newNodes[k] = v
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if node, exists := newNodes[nodeID]; exists {
|
|||
|
|
node.SetState(StateOnline)
|
|||
|
|
node.LastSeen = time.Now()
|
|||
|
|
} else {
|
|||
|
|
newNode := &Node{
|
|||
|
|
ID: nodeID,
|
|||
|
|
Address: address,
|
|||
|
|
LastSeen: time.Now(),
|
|||
|
|
}
|
|||
|
|
newNode.SetState(StateOnline)
|
|||
|
|
newNodes[nodeID] = newNode
|
|||
|
|
atomic.AddInt64(&cm.stats.totalNodes, 1)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Атомарно обновляем карту узлов
|
|||
|
|
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes))
|
|||
|
|
|
|||
|
|
atomic.AddInt64(&cm.stats.activeNodes, 1)
|
|||
|
|
printSuccess("Узел %s (%s) присоединился к кластеру", nodeID, address)
|
|||
|
|
|
|||
|
|
// Если включена мастер-мастер репликация, отправляем текущее состояние новому узлу
|
|||
|
|
if cm.replicationEnabled && cm.masterMaster {
|
|||
|
|
go cm.sendInitialSync(nodeID)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Если включён шардинг, обновляем распределение шардов
|
|||
|
|
if cm.shardManager.enabled {
|
|||
|
|
cm.rebalanceShards()
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// sendInitialSync отправляет начальную синхронизацию новому узлу
|
|||
|
|
func (cm *ClusterManager) sendInitialSync(targetNodeID string) {
|
|||
|
|
// Получаем текущую карту узлов
|
|||
|
|
nodesPtr := atomic.LoadPointer(&cm.nodes)
|
|||
|
|
if nodesPtr == nil {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
nodes := *(*map[string]*Node)(nodesPtr)
|
|||
|
|
|
|||
|
|
targetNode, exists := nodes[targetNodeID]
|
|||
|
|
if !exists || targetNode.GetState() != StateOnline {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// TODO: Отправить текущее состояние базы данных новому узлу
|
|||
|
|
printInfo("Отправка начальной синхронизации узлу %s", targetNodeID)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// handleNodeLeave обрабатывает отключение узла
|
|||
|
|
func (cm *ClusterManager) handleNodeLeave(nodeID string) {
|
|||
|
|
// Получаем текущую карту узлов
|
|||
|
|
nodesPtr := atomic.LoadPointer(&cm.nodes)
|
|||
|
|
if nodesPtr == nil {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
oldNodes := *(*map[string]*Node)(nodesPtr)
|
|||
|
|
|
|||
|
|
// Создаём новую карту
|
|||
|
|
newNodes := make(map[string]*Node)
|
|||
|
|
for k, v := range oldNodes {
|
|||
|
|
if k == nodeID {
|
|||
|
|
v.SetState(StateOffline)
|
|||
|
|
newNodes[k] = v
|
|||
|
|
} else {
|
|||
|
|
newNodes[k] = v
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Атомарно обновляем карту узлов
|
|||
|
|
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes))
|
|||
|
|
|
|||
|
|
atomic.AddInt64(&cm.stats.activeNodes, -1)
|
|||
|
|
printWarning("Узел %s покинул кластер", nodeID)
|
|||
|
|
|
|||
|
|
// Если включён шардинг, обновляем распределение шардов
|
|||
|
|
if cm.shardManager.enabled {
|
|||
|
|
cm.rebalanceShards()
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// updateNodeHeartbeat обновляет время последнего heartbeat узла
|
|||
|
|
func (cm *ClusterManager) updateNodeHeartbeat(nodeID, address string) {
|
|||
|
|
// Получаем текущую карту узлов
|
|||
|
|
nodesPtr := atomic.LoadPointer(&cm.nodes)
|
|||
|
|
if nodesPtr == nil {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
oldNodes := *(*map[string]*Node)(nodesPtr)
|
|||
|
|
|
|||
|
|
// Проверяем, нужно ли обновление
|
|||
|
|
node, exists := oldNodes[nodeID]
|
|||
|
|
if exists && node.GetState() == StateOnline && time.Since(node.LastSeen) < 5*time.Second {
|
|||
|
|
// Обновление не требуется, просто обновляем LastSeen в существующей карте
|
|||
|
|
node.LastSeen = time.Now()
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Создаём новую карту для обновления
|
|||
|
|
newNodes := make(map[string]*Node)
|
|||
|
|
for k, v := range oldNodes {
|
|||
|
|
newNodes[k] = v
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if exists {
|
|||
|
|
node.LastSeen = time.Now()
|
|||
|
|
if node.GetState() == StateOffline {
|
|||
|
|
node.SetState(StateOnline)
|
|||
|
|
atomic.AddInt64(&cm.stats.activeNodes, 1)
|
|||
|
|
}
|
|||
|
|
} else {
|
|||
|
|
newNode := &Node{
|
|||
|
|
ID: nodeID,
|
|||
|
|
Address: address,
|
|||
|
|
LastSeen: time.Now(),
|
|||
|
|
}
|
|||
|
|
newNode.SetState(StateOnline)
|
|||
|
|
newNodes[nodeID] = newNode
|
|||
|
|
atomic.AddInt64(&cm.stats.totalNodes, 1)
|
|||
|
|
atomic.AddInt64(&cm.stats.activeNodes, 1)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Атомарно обновляем карту узлов
|
|||
|
|
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// rebalanceShards выполняет ребалансировку шардов при изменении состава кластера
|
|||
|
|
func (cm *ClusterManager) rebalanceShards() {
|
|||
|
|
if !cm.shardManager.enabled || !cm.IsCoordinator() {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Получаем текущие узлы
|
|||
|
|
nodesPtr := atomic.LoadPointer(&cm.nodes)
|
|||
|
|
if nodesPtr == nil {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
nodes := *(*map[string]*Node)(nodesPtr)
|
|||
|
|
|
|||
|
|
// Собираем онлайн узлы
|
|||
|
|
onlineNodes := make([]string, 0)
|
|||
|
|
for id, node := range nodes {
|
|||
|
|
if node.GetState() == StateOnline {
|
|||
|
|
onlineNodes = append(onlineNodes, id)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Здесь должна быть логика ребалансировки шардов
|
|||
|
|
// В демо-версии просто перераспределяем существующие шарды
|
|||
|
|
|
|||
|
|
cm.shardManager.Rebalance()
|
|||
|
|
|
|||
|
|
atomic.AddInt64(&cm.stats.rebalancingCnt, 1)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// AddNode добавляет новый узел в кластер
|
|||
|
|
func (cm *ClusterManager) AddNode(address string) error {
|
|||
|
|
if !cm.IsCoordinator() {
|
|||
|
|
return errors.New("только координатор может добавлять узлы")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Получаем текущую карту узлов
|
|||
|
|
nodesPtr := atomic.LoadPointer(&cm.nodes)
|
|||
|
|
if nodesPtr == nil {
|
|||
|
|
return errors.New("карта узлов не инициализирована")
|
|||
|
|
}
|
|||
|
|
oldNodes := *(*map[string]*Node)(nodesPtr)
|
|||
|
|
|
|||
|
|
// Генерируем ID для нового узла
|
|||
|
|
nodeID := fmt.Sprintf("node-%d", len(oldNodes)+1)
|
|||
|
|
|
|||
|
|
// Создаём новую карту
|
|||
|
|
newNodes := make(map[string]*Node)
|
|||
|
|
for k, v := range oldNodes {
|
|||
|
|
newNodes[k] = v
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
newNode := &Node{
|
|||
|
|
ID: nodeID,
|
|||
|
|
Address: address,
|
|||
|
|
LastSeen: time.Now(),
|
|||
|
|
}
|
|||
|
|
newNode.SetState(StateJoining)
|
|||
|
|
newNodes[nodeID] = newNode
|
|||
|
|
|
|||
|
|
// Атомарно обновляем карту узлов
|
|||
|
|
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes))
|
|||
|
|
|
|||
|
|
atomic.AddInt64(&cm.stats.totalNodes, 1)
|
|||
|
|
|
|||
|
|
printSuccess("Узел %s (%s) добавлен в кластер", nodeID, address)
|
|||
|
|
|
|||
|
|
// Если включён шардинг, обновляем распределение
|
|||
|
|
if cm.shardManager.enabled {
|
|||
|
|
cm.rebalanceShards()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// RemoveNode удаляет узел из кластера
|
|||
|
|
func (cm *ClusterManager) RemoveNode(nodeID string) error {
|
|||
|
|
if !cm.IsCoordinator() {
|
|||
|
|
return errors.New("только координатор может удалять узлы")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Получаем текущую карту узлов
|
|||
|
|
nodesPtr := atomic.LoadPointer(&cm.nodes)
|
|||
|
|
if nodesPtr == nil {
|
|||
|
|
return errors.New("карта узлов не инициализирована")
|
|||
|
|
}
|
|||
|
|
oldNodes := *(*map[string]*Node)(nodesPtr)
|
|||
|
|
|
|||
|
|
if _, exists := oldNodes[nodeID]; !exists {
|
|||
|
|
return errors.New("узел не найден")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Создаём новую карту без удаляемого узла
|
|||
|
|
newNodes := make(map[string]*Node)
|
|||
|
|
for k, v := range oldNodes {
|
|||
|
|
if k != nodeID {
|
|||
|
|
newNodes[k] = v
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Атомарно обновляем карту узлов
|
|||
|
|
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes))
|
|||
|
|
|
|||
|
|
atomic.AddInt64(&cm.stats.totalNodes, -1)
|
|||
|
|
|
|||
|
|
printSuccess("Узел %s удален из кластера", nodeID)
|
|||
|
|
|
|||
|
|
// Если включён шардинг, обновляем распределение
|
|||
|
|
if cm.shardManager.enabled {
|
|||
|
|
cm.rebalanceShards()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetClusterStatus возвращает статус кластера
|
|||
|
|
func (cm *ClusterManager) GetClusterStatus() map[string]interface{} {
|
|||
|
|
status := make(map[string]interface{})
|
|||
|
|
status["total_nodes"] = atomic.LoadInt64(&cm.stats.totalNodes)
|
|||
|
|
status["active_nodes"] = atomic.LoadInt64(&cm.stats.activeNodes)
|
|||
|
|
status["is_coordinator"] = cm.IsCoordinator()
|
|||
|
|
status["coordinator"] = cm.coordinatorAddr
|
|||
|
|
status["replication_enabled"] = cm.replicationEnabled
|
|||
|
|
status["master_master"] = cm.masterMaster
|
|||
|
|
|
|||
|
|
// Добавляем информацию о шардинге
|
|||
|
|
if cm.shardManager.enabled {
|
|||
|
|
shardStats := cm.shardManager.GetShardStats()
|
|||
|
|
status["sharding"] = shardStats
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Получаем текущую карту узлов атомарно
|
|||
|
|
nodesPtr := atomic.LoadPointer(&cm.nodes)
|
|||
|
|
if nodesPtr != nil {
|
|||
|
|
nodes := *(*map[string]*Node)(nodesPtr)
|
|||
|
|
nodesList := make([]map[string]interface{}, 0, len(nodes))
|
|||
|
|
for id, node := range nodes {
|
|||
|
|
nodesList = append(nodesList, map[string]interface{}{
|
|||
|
|
"id": id,
|
|||
|
|
"address": node.Address,
|
|||
|
|
"state": node.GetState().String(),
|
|||
|
|
"last_seen": node.LastSeen.Format(time.RFC3339),
|
|||
|
|
})
|
|||
|
|
}
|
|||
|
|
status["nodes"] = nodesList
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return status
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// RebalanceCluster выполняет ребалансировку всего кластера
|
|||
|
|
func (cm *ClusterManager) RebalanceCluster() error {
|
|||
|
|
if !cm.IsCoordinator() {
|
|||
|
|
return errors.New("only coordinator can rebalance the cluster")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
printInfo("Starting cluster rebalance...")
|
|||
|
|
|
|||
|
|
// Получаем текущие узлы
|
|||
|
|
nodesPtr := atomic.LoadPointer(&cm.nodes)
|
|||
|
|
if nodesPtr == nil {
|
|||
|
|
return errors.New("node map not initialized")
|
|||
|
|
}
|
|||
|
|
nodes := *(*map[string]*Node)(nodesPtr)
|
|||
|
|
|
|||
|
|
// Собираем онлайн узлы
|
|||
|
|
onlineNodes := make([]string, 0)
|
|||
|
|
for id, node := range nodes {
|
|||
|
|
if node.GetState() == StateOnline {
|
|||
|
|
onlineNodes = append(onlineNodes, id)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if len(onlineNodes) == 0 {
|
|||
|
|
return errors.New("no online nodes available for rebalance")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Если включён шардинг, ребалансируем шарды
|
|||
|
|
if cm.shardManager.enabled {
|
|||
|
|
err := cm.shardManager.Rebalance()
|
|||
|
|
if err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
atomic.AddInt64(&cm.stats.rebalancingCnt, 1)
|
|||
|
|
|
|||
|
|
printSuccess("Cluster rebalance completed successfully")
|
|||
|
|
|
|||
|
|
return nil
|
|||
|
|
}
|