2026-02-27 22:04:04 +03:00

811 lines
24 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// /futriis/internal/cluster/node.go
// Пакет cluster реализует управление кластером, координацию узлов и репликацию данных.
// Обеспечивает обнаружение узлов, heartbeat механизм для мониторинга доступности,
// а также синхронную мастер-мастер репликацию между узлами кластера.
// Поддерживает автоматическое переключение ролей и балансировку нагрузки.
package cluster
import (
"encoding/json"
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"unsafe"
"futriis/pkg/config"
)
// Простые функции для логирования (без зависимостей)
func printInfo(format string, args ...interface{}) {
fmt.Printf("\033[34m[INFO]\033[0m %s\n", fmt.Sprintf(format, args...))
}
func printSuccess(format string, args ...interface{}) {
fmt.Printf("\033[32m[OK]\033[0m %s\n", fmt.Sprintf(format, args...))
}
func printWarning(format string, args ...interface{}) {
fmt.Printf("\033[33m[WARN]\033[0m %s\n", fmt.Sprintf(format, args...))
}
func printError(format string, args ...interface{}) {
fmt.Printf("\033[31m[ERROR]\033[0m %s\n", fmt.Sprintf(format, args...))
}
// NodeState состояние узла
type NodeState int
const (
StateOffline NodeState = iota
StateJoining
StateOnline
StateLeaving
StateFailed
)
func (s NodeState) String() string {
switch s {
case StateOffline:
return "offline"
case StateJoining:
return "joining"
case StateOnline:
return "online"
case StateLeaving:
return "leaving"
case StateFailed:
return "failed"
default:
return "unknown"
}
}
// ReplicationMessage представляет сообщение репликации
type ReplicationMessage struct {
Type string `json:"type"` // "write", "sync", "ack"
Command string `json:"command"` // Команда (create, update, delete)
Args []interface{} `json:"args"` // Аргументы команды
Timestamp int64 `json:"timestamp"` // Временная метка
NodeID string `json:"node_id"` // ID исходного узла
ShardID string `json:"shard_id"` // ID шарда
}
// Node представляет узел кластера (с wait-free указателями)
type Node struct {
ID string
Address string
state int32 // Атомарное состояние
LastSeen time.Time
ShardIDs []string
// Используем atomic.Value для wait-free доступа к изменяемым полям
nodeState unsafe.Pointer // Атомарный указатель на карту состояний
}
// GetState атомарно получает состояние узла
func (n *Node) GetState() NodeState {
return NodeState(atomic.LoadInt32(&n.state))
}
// SetState атомарно устанавливает состояние узла
func (n *Node) SetState(state NodeState) {
atomic.StoreInt32(&n.state, int32(state))
}
// ClusterManager управляет кластером (с wait-free операциями)
type ClusterManager struct {
nodes unsafe.Pointer // Атомарный указатель на карту узлов
coordinatorAddr string
nodeID string
localAddr string
isCoordinator int32 // Атомарный флаг
heartbeatStop chan struct{}
replicationStop chan struct{}
stats struct {
totalNodes int64
activeNodes int64
rebalancingCnt int64
}
replicationEnabled bool
masterMaster bool
replicationQueue chan ReplicationMessage
replicationWG sync.WaitGroup
shardManager *ShardManager // Менеджер шардинга
}
// NewClusterManager создаёт новый менеджер кластера
func NewClusterManager(cfg *config.Config) *ClusterManager {
// Определяем стратегию шардинга по умолчанию
shardingStrategy := ConsistentHashing
shardingEnabled := false
initialShards := 10
// Проверяем наличие настроек шардинга в конфигурации
// Используем значения по умолчанию, если поля отсутствуют
cm := &ClusterManager{
coordinatorAddr: cfg.Cluster.CoordinatorAddress,
nodeID: cfg.Node.ID,
localAddr: cfg.Node.Address,
heartbeatStop: make(chan struct{}),
replicationStop: make(chan struct{}),
replicationEnabled: cfg.Replication.Enabled,
masterMaster: cfg.Replication.MasterMaster,
replicationQueue: make(chan ReplicationMessage, 1000),
}
// Создаём менеджер шардинга с значениями по умолчанию
cm.shardManager = NewShardManager(
shardingStrategy,
cfg.Cluster.ReplicationFactor,
shardingEnabled,
)
// Устанавливаем флаг координатора атомарно
isCoord := int32(0)
if cfg.Node.Address == cfg.Cluster.CoordinatorAddress {
isCoord = 1
}
atomic.StoreInt32(&cm.isCoordinator, isCoord)
// Создаём начальную карту узлов
nodes := make(map[string]*Node)
// Добавляем себя в кластер
selfNode := &Node{
ID: cfg.Node.ID,
Address: cfg.Node.Address,
LastSeen: time.Now(),
}
selfNode.SetState(StateOnline)
nodes[cfg.Node.ID] = selfNode
// Атомарно сохраняем карту узлов
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&nodes))
atomic.AddInt64(&cm.stats.totalNodes, 1)
atomic.AddInt64(&cm.stats.activeNodes, 1)
// Создаём начальные шарды если включён шардинг
if shardingEnabled {
// Получаем список всех узлов
allNodes := make([]string, 0)
for id := range nodes {
allNodes = append(allNodes, id)
}
// Создаём шарды
cm.shardManager.CreateShards(initialShards, allNodes)
}
// Запускаем обработчик репликации если включена мастер-мастер
if cm.replicationEnabled && cm.masterMaster {
go cm.startReplicationHandler()
}
return cm
}
// IsCoordinator атомарно проверяет, является ли узел координатором
func (cm *ClusterManager) IsCoordinator() bool {
return atomic.LoadInt32(&cm.isCoordinator) == 1
}
// Start запускает кластерные сервисы
func (cm *ClusterManager) Start() error {
if cm.IsCoordinator() {
// Запускаем координатор
go cm.startCoordinator()
}
// Запускаем heartbeat
go cm.heartbeatLoop()
printInfo("Кластерный менеджер запущен")
if cm.replicationEnabled && cm.masterMaster {
printInfo("Мастер-мастер репликация активирована")
}
if cm.shardManager.enabled {
printInfo("Шардинг активирован, стратегия: %v", cm.shardManager.getStrategyName())
}
return nil
}
// Stop останавливает кластерные сервисы
func (cm *ClusterManager) Stop() {
close(cm.heartbeatStop)
if cm.replicationEnabled && cm.masterMaster {
close(cm.replicationStop)
cm.replicationWG.Wait()
}
}
// ReplicateCommand реплицирует команду на все узлы кластера с учётом шардинга
func (cm *ClusterManager) ReplicateCommand(cmd string, args []interface{}, key string) error {
if !cm.replicationEnabled || !cm.masterMaster {
return nil // Репликация не включена
}
// Определяем шард для ключа
var shardID string
if cm.shardManager.enabled && key != "" {
shard, err := cm.shardManager.GetShardForKey(key)
if err == nil && shard != nil {
shardID = shard.ID
cm.shardManager.RecordWrite(shardID)
}
}
msg := ReplicationMessage{
Type: "write",
Command: cmd,
Args: args,
Timestamp: time.Now().UnixNano(),
NodeID: cm.nodeID,
ShardID: shardID,
}
// Отправляем в очередь репликации
select {
case cm.replicationQueue <- msg:
return nil
default:
return errors.New("очередь репликации переполнена")
}
}
// startReplicationHandler запускает обработчик репликации
func (cm *ClusterManager) startReplicationHandler() {
cm.replicationWG.Add(1)
defer cm.replicationWG.Done()
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case msg := <-cm.replicationQueue:
cm.sendReplicationMessage(msg)
case <-ticker.C:
cm.sendSyncRequest()
case <-cm.replicationStop:
return
}
}
}
// sendReplicationMessage отправляет сообщение репликации на другие узлы
func (cm *ClusterManager) sendReplicationMessage(msg ReplicationMessage) {
// Получаем текущую карту узлов атомарно
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return
}
nodes := *(*map[string]*Node)(nodesPtr)
for id, node := range nodes {
if id == cm.nodeID || node.GetState() != StateOnline {
continue // Не отправляем себе и офлайн узлам
}
// Если есть шард, проверяем, должен ли узел получать это сообщение
if msg.ShardID != "" && cm.shardManager.enabled {
shard, exists := cm.shardManager.GetShardByID(msg.ShardID)
if exists {
shouldSend := false
for _, nodeID := range shard.Nodes {
if nodeID == id {
shouldSend = true
break
}
}
if !shouldSend {
continue
}
}
}
go func(targetNode *Node) {
conn, err := net.DialTimeout("tcp", targetNode.Address, 3*time.Second)
if err != nil {
printWarning("Не удалось подключиться к узлу %s для репликации: %v", targetNode.ID, err)
return
}
defer conn.Close()
encoder := json.NewEncoder(conn)
if err := encoder.Encode(msg); err != nil {
printWarning("Ошибка отправки репликации на узел %s: %v", targetNode.ID, err)
return
}
// Ожидаем подтверждение для синхронной репликации
if msg.Type == "write" {
var ack map[string]interface{}
decoder := json.NewDecoder(conn)
if err := decoder.Decode(&ack); err == nil {
if status, ok := ack["status"].(string); ok && status == "ok" {
printSuccess("Репликация на узел %s подтверждена", targetNode.ID)
}
}
}
}(node)
}
}
// sendSyncRequest отправляет запрос синхронизации
func (cm *ClusterManager) sendSyncRequest() {
if !cm.IsCoordinator() {
return // Только координатор инициирует синхронизацию
}
msg := ReplicationMessage{
Type: "sync",
Timestamp: time.Now().UnixNano(),
NodeID: cm.nodeID,
}
cm.sendReplicationMessage(msg)
}
// handleReplicationMessage обрабатывает входящее сообщение репликации
func (cm *ClusterManager) handleReplicationMessage(conn net.Conn) {
defer conn.Close()
var msg ReplicationMessage
decoder := json.NewDecoder(conn)
if err := decoder.Decode(&msg); err != nil {
return
}
switch msg.Type {
case "write":
// Получаем команду от другого узла
printInfo("Получена команда репликации: %s от узла %s (шард: %s)",
msg.Command, msg.NodeID, msg.ShardID)
// Здесь будет вызов движка для выполнения команды
// TODO: Интегрировать с engine для выполнения реплицированных команд
// Отправляем подтверждение
encoder := json.NewEncoder(conn)
encoder.Encode(map[string]interface{}{
"status": "ok",
"time": time.Now().UnixNano(),
})
case "sync":
// Запрос синхронизации
printInfo("Получен запрос синхронизации от узла %s", msg.NodeID)
// TODO: Отправить текущее состояние
case "ack":
// Подтверждение получения
printInfo("Получено подтверждение от узла %s", msg.NodeID)
}
}
// heartbeatLoop отправляет heartbeat сигналы
func (cm *ClusterManager) heartbeatLoop() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cm.sendHeartbeat()
case <-cm.heartbeatStop:
return
}
}
}
// sendHeartbeat отправляет heartbeat координатору
func (cm *ClusterManager) sendHeartbeat() {
if !cm.IsCoordinator() {
// Отправляем heartbeat координатору
conn, err := net.DialTimeout("tcp", cm.coordinatorAddr, 3*time.Second)
if err != nil {
return
}
defer conn.Close()
heartbeat := map[string]interface{}{
"type": "heartbeat",
"node_id": cm.nodeID,
"address": cm.localAddr,
"time": time.Now().Unix(),
}
json.NewEncoder(conn).Encode(heartbeat)
}
}
// startCoordinator запускает координатор кластера
func (cm *ClusterManager) startCoordinator() {
listener, err := net.Listen("tcp", cm.coordinatorAddr)
if err != nil {
printError("Ошибка запуска координатора: %v", err)
return
}
defer listener.Close()
printInfo("Координатор кластера запущен на " + cm.coordinatorAddr)
for {
conn, err := listener.Accept()
if err != nil {
continue
}
go cm.handleCoordinatorRequest(conn)
}
}
// handleCoordinatorRequest обрабатывает запросы к координатору
func (cm *ClusterManager) handleCoordinatorRequest(conn net.Conn) {
defer conn.Close()
var req map[string]interface{}
if err := json.NewDecoder(conn).Decode(&req); err != nil {
return
}
msgType, _ := req["type"].(string)
switch msgType {
case "heartbeat":
nodeID, _ := req["node_id"].(string)
address, _ := req["address"].(string)
cm.updateNodeHeartbeat(nodeID, address)
case "join":
nodeID, _ := req["node_id"].(string)
address, _ := req["address"].(string)
cm.handleNodeJoin(nodeID, address)
case "leave":
nodeID, _ := req["node_id"].(string)
cm.handleNodeLeave(nodeID)
case "replication":
// Обработка сообщения репликации на координаторе
cm.handleReplicationMessage(conn)
}
}
// handleNodeJoin обрабатывает присоединение узла
func (cm *ClusterManager) handleNodeJoin(nodeID, address string) {
// Получаем текущую карту узлов
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return
}
oldNodes := *(*map[string]*Node)(nodesPtr)
// Создаём новую карту
newNodes := make(map[string]*Node)
for k, v := range oldNodes {
newNodes[k] = v
}
if node, exists := newNodes[nodeID]; exists {
node.SetState(StateOnline)
node.LastSeen = time.Now()
} else {
newNode := &Node{
ID: nodeID,
Address: address,
LastSeen: time.Now(),
}
newNode.SetState(StateOnline)
newNodes[nodeID] = newNode
atomic.AddInt64(&cm.stats.totalNodes, 1)
}
// Атомарно обновляем карту узлов
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes))
atomic.AddInt64(&cm.stats.activeNodes, 1)
printSuccess("Узел %s (%s) присоединился к кластеру", nodeID, address)
// Если включена мастер-мастер репликация, отправляем текущее состояние новому узлу
if cm.replicationEnabled && cm.masterMaster {
go cm.sendInitialSync(nodeID)
}
// Если включён шардинг, обновляем распределение шардов
if cm.shardManager.enabled {
cm.rebalanceShards()
}
}
// sendInitialSync отправляет начальную синхронизацию новому узлу
func (cm *ClusterManager) sendInitialSync(targetNodeID string) {
// Получаем текущую карту узлов
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return
}
nodes := *(*map[string]*Node)(nodesPtr)
targetNode, exists := nodes[targetNodeID]
if !exists || targetNode.GetState() != StateOnline {
return
}
// TODO: Отправить текущее состояние базы данных новому узлу
printInfo("Отправка начальной синхронизации узлу %s", targetNodeID)
}
// handleNodeLeave обрабатывает отключение узла
func (cm *ClusterManager) handleNodeLeave(nodeID string) {
// Получаем текущую карту узлов
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return
}
oldNodes := *(*map[string]*Node)(nodesPtr)
// Создаём новую карту
newNodes := make(map[string]*Node)
for k, v := range oldNodes {
if k == nodeID {
v.SetState(StateOffline)
newNodes[k] = v
} else {
newNodes[k] = v
}
}
// Атомарно обновляем карту узлов
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes))
atomic.AddInt64(&cm.stats.activeNodes, -1)
printWarning("Узел %s покинул кластер", nodeID)
// Если включён шардинг, обновляем распределение шардов
if cm.shardManager.enabled {
cm.rebalanceShards()
}
}
// updateNodeHeartbeat обновляет время последнего heartbeat узла
func (cm *ClusterManager) updateNodeHeartbeat(nodeID, address string) {
// Получаем текущую карту узлов
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return
}
oldNodes := *(*map[string]*Node)(nodesPtr)
// Проверяем, нужно ли обновление
node, exists := oldNodes[nodeID]
if exists && node.GetState() == StateOnline && time.Since(node.LastSeen) < 5*time.Second {
// Обновление не требуется, просто обновляем LastSeen в существующей карте
node.LastSeen = time.Now()
return
}
// Создаём новую карту для обновления
newNodes := make(map[string]*Node)
for k, v := range oldNodes {
newNodes[k] = v
}
if exists {
node.LastSeen = time.Now()
if node.GetState() == StateOffline {
node.SetState(StateOnline)
atomic.AddInt64(&cm.stats.activeNodes, 1)
}
} else {
newNode := &Node{
ID: nodeID,
Address: address,
LastSeen: time.Now(),
}
newNode.SetState(StateOnline)
newNodes[nodeID] = newNode
atomic.AddInt64(&cm.stats.totalNodes, 1)
atomic.AddInt64(&cm.stats.activeNodes, 1)
}
// Атомарно обновляем карту узлов
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes))
}
// rebalanceShards выполняет ребалансировку шардов при изменении состава кластера
func (cm *ClusterManager) rebalanceShards() {
if !cm.shardManager.enabled || !cm.IsCoordinator() {
return
}
// Получаем текущие узлы
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return
}
nodes := *(*map[string]*Node)(nodesPtr)
// Собираем онлайн узлы
onlineNodes := make([]string, 0)
for id, node := range nodes {
if node.GetState() == StateOnline {
onlineNodes = append(onlineNodes, id)
}
}
// Здесь должна быть логика ребалансировки шардов
// В демо-версии просто перераспределяем существующие шарды
cm.shardManager.Rebalance()
atomic.AddInt64(&cm.stats.rebalancingCnt, 1)
}
// AddNode добавляет новый узел в кластер
func (cm *ClusterManager) AddNode(address string) error {
if !cm.IsCoordinator() {
return errors.New("только координатор может добавлять узлы")
}
// Получаем текущую карту узлов
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return errors.New("карта узлов не инициализирована")
}
oldNodes := *(*map[string]*Node)(nodesPtr)
// Генерируем ID для нового узла
nodeID := fmt.Sprintf("node-%d", len(oldNodes)+1)
// Создаём новую карту
newNodes := make(map[string]*Node)
for k, v := range oldNodes {
newNodes[k] = v
}
newNode := &Node{
ID: nodeID,
Address: address,
LastSeen: time.Now(),
}
newNode.SetState(StateJoining)
newNodes[nodeID] = newNode
// Атомарно обновляем карту узлов
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes))
atomic.AddInt64(&cm.stats.totalNodes, 1)
printSuccess("Узел %s (%s) добавлен в кластер", nodeID, address)
// Если включён шардинг, обновляем распределение
if cm.shardManager.enabled {
cm.rebalanceShards()
}
return nil
}
// RemoveNode удаляет узел из кластера
func (cm *ClusterManager) RemoveNode(nodeID string) error {
if !cm.IsCoordinator() {
return errors.New("только координатор может удалять узлы")
}
// Получаем текущую карту узлов
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return errors.New("карта узлов не инициализирована")
}
oldNodes := *(*map[string]*Node)(nodesPtr)
if _, exists := oldNodes[nodeID]; !exists {
return errors.New("узел не найден")
}
// Создаём новую карту без удаляемого узла
newNodes := make(map[string]*Node)
for k, v := range oldNodes {
if k != nodeID {
newNodes[k] = v
}
}
// Атомарно обновляем карту узлов
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes))
atomic.AddInt64(&cm.stats.totalNodes, -1)
printSuccess("Узел %s удален из кластера", nodeID)
// Если включён шардинг, обновляем распределение
if cm.shardManager.enabled {
cm.rebalanceShards()
}
return nil
}
// GetClusterStatus возвращает статус кластера
func (cm *ClusterManager) GetClusterStatus() map[string]interface{} {
status := make(map[string]interface{})
status["total_nodes"] = atomic.LoadInt64(&cm.stats.totalNodes)
status["active_nodes"] = atomic.LoadInt64(&cm.stats.activeNodes)
status["is_coordinator"] = cm.IsCoordinator()
status["coordinator"] = cm.coordinatorAddr
status["replication_enabled"] = cm.replicationEnabled
status["master_master"] = cm.masterMaster
// Добавляем информацию о шардинге
if cm.shardManager.enabled {
shardStats := cm.shardManager.GetShardStats()
status["sharding"] = shardStats
}
// Получаем текущую карту узлов атомарно
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr != nil {
nodes := *(*map[string]*Node)(nodesPtr)
nodesList := make([]map[string]interface{}, 0, len(nodes))
for id, node := range nodes {
nodesList = append(nodesList, map[string]interface{}{
"id": id,
"address": node.Address,
"state": node.GetState().String(),
"last_seen": node.LastSeen.Format(time.RFC3339),
})
}
status["nodes"] = nodesList
}
return status
}
// RebalanceCluster выполняет ребалансировку всего кластера
func (cm *ClusterManager) RebalanceCluster() error {
if !cm.IsCoordinator() {
return errors.New("only coordinator can rebalance the cluster")
}
printInfo("Starting cluster rebalance...")
// Получаем текущие узлы
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return errors.New("node map not initialized")
}
nodes := *(*map[string]*Node)(nodesPtr)
// Собираем онлайн узлы
onlineNodes := make([]string, 0)
for id, node := range nodes {
if node.GetState() == StateOnline {
onlineNodes = append(onlineNodes, id)
}
}
if len(onlineNodes) == 0 {
return errors.New("no online nodes available for rebalance")
}
// Если включён шардинг, ребалансируем шарды
if cm.shardManager.enabled {
err := cm.shardManager.Rebalance()
if err != nil {
return err
}
}
atomic.AddInt64(&cm.stats.rebalancingCnt, 1)
printSuccess("Cluster rebalance completed successfully")
return nil
}