811 lines
24 KiB
Go
Raw Normal View History

2026-02-27 22:04:04 +03:00
// /futriis/internal/cluster/node.go
// Пакет cluster реализует управление кластером, координацию узлов и репликацию данных.
// Обеспечивает обнаружение узлов, heartbeat механизм для мониторинга доступности,
// а также синхронную мастер-мастер репликацию между узлами кластера.
// Поддерживает автоматическое переключение ролей и балансировку нагрузки.
package cluster
import (
"encoding/json"
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"unsafe"
"futriis/pkg/config"
)
// Простые функции для логирования (без зависимостей)
func printInfo(format string, args ...interface{}) {
fmt.Printf("\033[34m[INFO]\033[0m %s\n", fmt.Sprintf(format, args...))
}
func printSuccess(format string, args ...interface{}) {
fmt.Printf("\033[32m[OK]\033[0m %s\n", fmt.Sprintf(format, args...))
}
func printWarning(format string, args ...interface{}) {
fmt.Printf("\033[33m[WARN]\033[0m %s\n", fmt.Sprintf(format, args...))
}
func printError(format string, args ...interface{}) {
fmt.Printf("\033[31m[ERROR]\033[0m %s\n", fmt.Sprintf(format, args...))
}
// NodeState состояние узла
type NodeState int
const (
StateOffline NodeState = iota
StateJoining
StateOnline
StateLeaving
StateFailed
)
func (s NodeState) String() string {
switch s {
case StateOffline:
return "offline"
case StateJoining:
return "joining"
case StateOnline:
return "online"
case StateLeaving:
return "leaving"
case StateFailed:
return "failed"
default:
return "unknown"
}
}
// ReplicationMessage представляет сообщение репликации
type ReplicationMessage struct {
Type string `json:"type"` // "write", "sync", "ack"
Command string `json:"command"` // Команда (create, update, delete)
Args []interface{} `json:"args"` // Аргументы команды
Timestamp int64 `json:"timestamp"` // Временная метка
NodeID string `json:"node_id"` // ID исходного узла
ShardID string `json:"shard_id"` // ID шарда
}
// Node представляет узел кластера (с wait-free указателями)
type Node struct {
ID string
Address string
state int32 // Атомарное состояние
LastSeen time.Time
ShardIDs []string
// Используем atomic.Value для wait-free доступа к изменяемым полям
nodeState unsafe.Pointer // Атомарный указатель на карту состояний
}
// GetState атомарно получает состояние узла
func (n *Node) GetState() NodeState {
return NodeState(atomic.LoadInt32(&n.state))
}
// SetState атомарно устанавливает состояние узла
func (n *Node) SetState(state NodeState) {
atomic.StoreInt32(&n.state, int32(state))
}
// ClusterManager управляет кластером (с wait-free операциями)
type ClusterManager struct {
nodes unsafe.Pointer // Атомарный указатель на карту узлов
coordinatorAddr string
nodeID string
localAddr string
isCoordinator int32 // Атомарный флаг
heartbeatStop chan struct{}
replicationStop chan struct{}
stats struct {
totalNodes int64
activeNodes int64
rebalancingCnt int64
}
replicationEnabled bool
masterMaster bool
replicationQueue chan ReplicationMessage
replicationWG sync.WaitGroup
shardManager *ShardManager // Менеджер шардинга
}
// NewClusterManager создаёт новый менеджер кластера
func NewClusterManager(cfg *config.Config) *ClusterManager {
// Определяем стратегию шардинга по умолчанию
shardingStrategy := ConsistentHashing
shardingEnabled := false
initialShards := 10
// Проверяем наличие настроек шардинга в конфигурации
// Используем значения по умолчанию, если поля отсутствуют
cm := &ClusterManager{
coordinatorAddr: cfg.Cluster.CoordinatorAddress,
nodeID: cfg.Node.ID,
localAddr: cfg.Node.Address,
heartbeatStop: make(chan struct{}),
replicationStop: make(chan struct{}),
replicationEnabled: cfg.Replication.Enabled,
masterMaster: cfg.Replication.MasterMaster,
replicationQueue: make(chan ReplicationMessage, 1000),
}
// Создаём менеджер шардинга с значениями по умолчанию
cm.shardManager = NewShardManager(
shardingStrategy,
cfg.Cluster.ReplicationFactor,
shardingEnabled,
)
// Устанавливаем флаг координатора атомарно
isCoord := int32(0)
if cfg.Node.Address == cfg.Cluster.CoordinatorAddress {
isCoord = 1
}
atomic.StoreInt32(&cm.isCoordinator, isCoord)
// Создаём начальную карту узлов
nodes := make(map[string]*Node)
// Добавляем себя в кластер
selfNode := &Node{
ID: cfg.Node.ID,
Address: cfg.Node.Address,
LastSeen: time.Now(),
}
selfNode.SetState(StateOnline)
nodes[cfg.Node.ID] = selfNode
// Атомарно сохраняем карту узлов
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&nodes))
atomic.AddInt64(&cm.stats.totalNodes, 1)
atomic.AddInt64(&cm.stats.activeNodes, 1)
// Создаём начальные шарды если включён шардинг
if shardingEnabled {
// Получаем список всех узлов
allNodes := make([]string, 0)
for id := range nodes {
allNodes = append(allNodes, id)
}
// Создаём шарды
cm.shardManager.CreateShards(initialShards, allNodes)
}
// Запускаем обработчик репликации если включена мастер-мастер
if cm.replicationEnabled && cm.masterMaster {
go cm.startReplicationHandler()
}
return cm
}
// IsCoordinator атомарно проверяет, является ли узел координатором
func (cm *ClusterManager) IsCoordinator() bool {
return atomic.LoadInt32(&cm.isCoordinator) == 1
}
// Start запускает кластерные сервисы
func (cm *ClusterManager) Start() error {
if cm.IsCoordinator() {
// Запускаем координатор
go cm.startCoordinator()
}
// Запускаем heartbeat
go cm.heartbeatLoop()
printInfo("Кластерный менеджер запущен")
if cm.replicationEnabled && cm.masterMaster {
printInfo("Мастер-мастер репликация активирована")
}
if cm.shardManager.enabled {
printInfo("Шардинг активирован, стратегия: %v", cm.shardManager.getStrategyName())
}
return nil
}
// Stop останавливает кластерные сервисы
func (cm *ClusterManager) Stop() {
close(cm.heartbeatStop)
if cm.replicationEnabled && cm.masterMaster {
close(cm.replicationStop)
cm.replicationWG.Wait()
}
}
// ReplicateCommand реплицирует команду на все узлы кластера с учётом шардинга
func (cm *ClusterManager) ReplicateCommand(cmd string, args []interface{}, key string) error {
if !cm.replicationEnabled || !cm.masterMaster {
return nil // Репликация не включена
}
// Определяем шард для ключа
var shardID string
if cm.shardManager.enabled && key != "" {
shard, err := cm.shardManager.GetShardForKey(key)
if err == nil && shard != nil {
shardID = shard.ID
cm.shardManager.RecordWrite(shardID)
}
}
msg := ReplicationMessage{
Type: "write",
Command: cmd,
Args: args,
Timestamp: time.Now().UnixNano(),
NodeID: cm.nodeID,
ShardID: shardID,
}
// Отправляем в очередь репликации
select {
case cm.replicationQueue <- msg:
return nil
default:
return errors.New("очередь репликации переполнена")
}
}
// startReplicationHandler запускает обработчик репликации
func (cm *ClusterManager) startReplicationHandler() {
cm.replicationWG.Add(1)
defer cm.replicationWG.Done()
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case msg := <-cm.replicationQueue:
cm.sendReplicationMessage(msg)
case <-ticker.C:
cm.sendSyncRequest()
case <-cm.replicationStop:
return
}
}
}
// sendReplicationMessage отправляет сообщение репликации на другие узлы
func (cm *ClusterManager) sendReplicationMessage(msg ReplicationMessage) {
// Получаем текущую карту узлов атомарно
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return
}
nodes := *(*map[string]*Node)(nodesPtr)
for id, node := range nodes {
if id == cm.nodeID || node.GetState() != StateOnline {
continue // Не отправляем себе и офлайн узлам
}
// Если есть шард, проверяем, должен ли узел получать это сообщение
if msg.ShardID != "" && cm.shardManager.enabled {
shard, exists := cm.shardManager.GetShardByID(msg.ShardID)
if exists {
shouldSend := false
for _, nodeID := range shard.Nodes {
if nodeID == id {
shouldSend = true
break
}
}
if !shouldSend {
continue
}
}
}
go func(targetNode *Node) {
conn, err := net.DialTimeout("tcp", targetNode.Address, 3*time.Second)
if err != nil {
printWarning("Не удалось подключиться к узлу %s для репликации: %v", targetNode.ID, err)
return
}
defer conn.Close()
encoder := json.NewEncoder(conn)
if err := encoder.Encode(msg); err != nil {
printWarning("Ошибка отправки репликации на узел %s: %v", targetNode.ID, err)
return
}
// Ожидаем подтверждение для синхронной репликации
if msg.Type == "write" {
var ack map[string]interface{}
decoder := json.NewDecoder(conn)
if err := decoder.Decode(&ack); err == nil {
if status, ok := ack["status"].(string); ok && status == "ok" {
printSuccess("Репликация на узел %s подтверждена", targetNode.ID)
}
}
}
}(node)
}
}
// sendSyncRequest отправляет запрос синхронизации
func (cm *ClusterManager) sendSyncRequest() {
if !cm.IsCoordinator() {
return // Только координатор инициирует синхронизацию
}
msg := ReplicationMessage{
Type: "sync",
Timestamp: time.Now().UnixNano(),
NodeID: cm.nodeID,
}
cm.sendReplicationMessage(msg)
}
// handleReplicationMessage обрабатывает входящее сообщение репликации
func (cm *ClusterManager) handleReplicationMessage(conn net.Conn) {
defer conn.Close()
var msg ReplicationMessage
decoder := json.NewDecoder(conn)
if err := decoder.Decode(&msg); err != nil {
return
}
switch msg.Type {
case "write":
// Получаем команду от другого узла
printInfo("Получена команда репликации: %s от узла %s (шард: %s)",
msg.Command, msg.NodeID, msg.ShardID)
// Здесь будет вызов движка для выполнения команды
// TODO: Интегрировать с engine для выполнения реплицированных команд
// Отправляем подтверждение
encoder := json.NewEncoder(conn)
encoder.Encode(map[string]interface{}{
"status": "ok",
"time": time.Now().UnixNano(),
})
case "sync":
// Запрос синхронизации
printInfo("Получен запрос синхронизации от узла %s", msg.NodeID)
// TODO: Отправить текущее состояние
case "ack":
// Подтверждение получения
printInfo("Получено подтверждение от узла %s", msg.NodeID)
}
}
// heartbeatLoop отправляет heartbeat сигналы
func (cm *ClusterManager) heartbeatLoop() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cm.sendHeartbeat()
case <-cm.heartbeatStop:
return
}
}
}
// sendHeartbeat отправляет heartbeat координатору
func (cm *ClusterManager) sendHeartbeat() {
if !cm.IsCoordinator() {
// Отправляем heartbeat координатору
conn, err := net.DialTimeout("tcp", cm.coordinatorAddr, 3*time.Second)
if err != nil {
return
}
defer conn.Close()
heartbeat := map[string]interface{}{
"type": "heartbeat",
"node_id": cm.nodeID,
"address": cm.localAddr,
"time": time.Now().Unix(),
}
json.NewEncoder(conn).Encode(heartbeat)
}
}
// startCoordinator запускает координатор кластера
func (cm *ClusterManager) startCoordinator() {
listener, err := net.Listen("tcp", cm.coordinatorAddr)
if err != nil {
printError("Ошибка запуска координатора: %v", err)
return
}
defer listener.Close()
printInfo("Координатор кластера запущен на " + cm.coordinatorAddr)
for {
conn, err := listener.Accept()
if err != nil {
continue
}
go cm.handleCoordinatorRequest(conn)
}
}
// handleCoordinatorRequest обрабатывает запросы к координатору
func (cm *ClusterManager) handleCoordinatorRequest(conn net.Conn) {
defer conn.Close()
var req map[string]interface{}
if err := json.NewDecoder(conn).Decode(&req); err != nil {
return
}
msgType, _ := req["type"].(string)
switch msgType {
case "heartbeat":
nodeID, _ := req["node_id"].(string)
address, _ := req["address"].(string)
cm.updateNodeHeartbeat(nodeID, address)
case "join":
nodeID, _ := req["node_id"].(string)
address, _ := req["address"].(string)
cm.handleNodeJoin(nodeID, address)
case "leave":
nodeID, _ := req["node_id"].(string)
cm.handleNodeLeave(nodeID)
case "replication":
// Обработка сообщения репликации на координаторе
cm.handleReplicationMessage(conn)
}
}
// handleNodeJoin обрабатывает присоединение узла
func (cm *ClusterManager) handleNodeJoin(nodeID, address string) {
// Получаем текущую карту узлов
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return
}
oldNodes := *(*map[string]*Node)(nodesPtr)
// Создаём новую карту
newNodes := make(map[string]*Node)
for k, v := range oldNodes {
newNodes[k] = v
}
if node, exists := newNodes[nodeID]; exists {
node.SetState(StateOnline)
node.LastSeen = time.Now()
} else {
newNode := &Node{
ID: nodeID,
Address: address,
LastSeen: time.Now(),
}
newNode.SetState(StateOnline)
newNodes[nodeID] = newNode
atomic.AddInt64(&cm.stats.totalNodes, 1)
}
// Атомарно обновляем карту узлов
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes))
atomic.AddInt64(&cm.stats.activeNodes, 1)
printSuccess("Узел %s (%s) присоединился к кластеру", nodeID, address)
// Если включена мастер-мастер репликация, отправляем текущее состояние новому узлу
if cm.replicationEnabled && cm.masterMaster {
go cm.sendInitialSync(nodeID)
}
// Если включён шардинг, обновляем распределение шардов
if cm.shardManager.enabled {
cm.rebalanceShards()
}
}
// sendInitialSync отправляет начальную синхронизацию новому узлу
func (cm *ClusterManager) sendInitialSync(targetNodeID string) {
// Получаем текущую карту узлов
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return
}
nodes := *(*map[string]*Node)(nodesPtr)
targetNode, exists := nodes[targetNodeID]
if !exists || targetNode.GetState() != StateOnline {
return
}
// TODO: Отправить текущее состояние базы данных новому узлу
printInfo("Отправка начальной синхронизации узлу %s", targetNodeID)
}
// handleNodeLeave обрабатывает отключение узла
func (cm *ClusterManager) handleNodeLeave(nodeID string) {
// Получаем текущую карту узлов
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return
}
oldNodes := *(*map[string]*Node)(nodesPtr)
// Создаём новую карту
newNodes := make(map[string]*Node)
for k, v := range oldNodes {
if k == nodeID {
v.SetState(StateOffline)
newNodes[k] = v
} else {
newNodes[k] = v
}
}
// Атомарно обновляем карту узлов
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes))
atomic.AddInt64(&cm.stats.activeNodes, -1)
printWarning("Узел %s покинул кластер", nodeID)
// Если включён шардинг, обновляем распределение шардов
if cm.shardManager.enabled {
cm.rebalanceShards()
}
}
// updateNodeHeartbeat обновляет время последнего heartbeat узла
func (cm *ClusterManager) updateNodeHeartbeat(nodeID, address string) {
// Получаем текущую карту узлов
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return
}
oldNodes := *(*map[string]*Node)(nodesPtr)
// Проверяем, нужно ли обновление
node, exists := oldNodes[nodeID]
if exists && node.GetState() == StateOnline && time.Since(node.LastSeen) < 5*time.Second {
// Обновление не требуется, просто обновляем LastSeen в существующей карте
node.LastSeen = time.Now()
return
}
// Создаём новую карту для обновления
newNodes := make(map[string]*Node)
for k, v := range oldNodes {
newNodes[k] = v
}
if exists {
node.LastSeen = time.Now()
if node.GetState() == StateOffline {
node.SetState(StateOnline)
atomic.AddInt64(&cm.stats.activeNodes, 1)
}
} else {
newNode := &Node{
ID: nodeID,
Address: address,
LastSeen: time.Now(),
}
newNode.SetState(StateOnline)
newNodes[nodeID] = newNode
atomic.AddInt64(&cm.stats.totalNodes, 1)
atomic.AddInt64(&cm.stats.activeNodes, 1)
}
// Атомарно обновляем карту узлов
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes))
}
// rebalanceShards выполняет ребалансировку шардов при изменении состава кластера
func (cm *ClusterManager) rebalanceShards() {
if !cm.shardManager.enabled || !cm.IsCoordinator() {
return
}
// Получаем текущие узлы
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return
}
nodes := *(*map[string]*Node)(nodesPtr)
// Собираем онлайн узлы
onlineNodes := make([]string, 0)
for id, node := range nodes {
if node.GetState() == StateOnline {
onlineNodes = append(onlineNodes, id)
}
}
// Здесь должна быть логика ребалансировки шардов
// В демо-версии просто перераспределяем существующие шарды
cm.shardManager.Rebalance()
atomic.AddInt64(&cm.stats.rebalancingCnt, 1)
}
// AddNode добавляет новый узел в кластер
func (cm *ClusterManager) AddNode(address string) error {
if !cm.IsCoordinator() {
return errors.New("только координатор может добавлять узлы")
}
// Получаем текущую карту узлов
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return errors.New("карта узлов не инициализирована")
}
oldNodes := *(*map[string]*Node)(nodesPtr)
// Генерируем ID для нового узла
nodeID := fmt.Sprintf("node-%d", len(oldNodes)+1)
// Создаём новую карту
newNodes := make(map[string]*Node)
for k, v := range oldNodes {
newNodes[k] = v
}
newNode := &Node{
ID: nodeID,
Address: address,
LastSeen: time.Now(),
}
newNode.SetState(StateJoining)
newNodes[nodeID] = newNode
// Атомарно обновляем карту узлов
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes))
atomic.AddInt64(&cm.stats.totalNodes, 1)
printSuccess("Узел %s (%s) добавлен в кластер", nodeID, address)
// Если включён шардинг, обновляем распределение
if cm.shardManager.enabled {
cm.rebalanceShards()
}
return nil
}
// RemoveNode удаляет узел из кластера
func (cm *ClusterManager) RemoveNode(nodeID string) error {
if !cm.IsCoordinator() {
return errors.New("только координатор может удалять узлы")
}
// Получаем текущую карту узлов
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return errors.New("карта узлов не инициализирована")
}
oldNodes := *(*map[string]*Node)(nodesPtr)
if _, exists := oldNodes[nodeID]; !exists {
return errors.New("узел не найден")
}
// Создаём новую карту без удаляемого узла
newNodes := make(map[string]*Node)
for k, v := range oldNodes {
if k != nodeID {
newNodes[k] = v
}
}
// Атомарно обновляем карту узлов
atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes))
atomic.AddInt64(&cm.stats.totalNodes, -1)
printSuccess("Узел %s удален из кластера", nodeID)
// Если включён шардинг, обновляем распределение
if cm.shardManager.enabled {
cm.rebalanceShards()
}
return nil
}
// GetClusterStatus возвращает статус кластера
func (cm *ClusterManager) GetClusterStatus() map[string]interface{} {
status := make(map[string]interface{})
status["total_nodes"] = atomic.LoadInt64(&cm.stats.totalNodes)
status["active_nodes"] = atomic.LoadInt64(&cm.stats.activeNodes)
status["is_coordinator"] = cm.IsCoordinator()
status["coordinator"] = cm.coordinatorAddr
status["replication_enabled"] = cm.replicationEnabled
status["master_master"] = cm.masterMaster
// Добавляем информацию о шардинге
if cm.shardManager.enabled {
shardStats := cm.shardManager.GetShardStats()
status["sharding"] = shardStats
}
// Получаем текущую карту узлов атомарно
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr != nil {
nodes := *(*map[string]*Node)(nodesPtr)
nodesList := make([]map[string]interface{}, 0, len(nodes))
for id, node := range nodes {
nodesList = append(nodesList, map[string]interface{}{
"id": id,
"address": node.Address,
"state": node.GetState().String(),
"last_seen": node.LastSeen.Format(time.RFC3339),
})
}
status["nodes"] = nodesList
}
return status
}
// RebalanceCluster выполняет ребалансировку всего кластера
func (cm *ClusterManager) RebalanceCluster() error {
if !cm.IsCoordinator() {
return errors.New("only coordinator can rebalance the cluster")
}
printInfo("Starting cluster rebalance...")
// Получаем текущие узлы
nodesPtr := atomic.LoadPointer(&cm.nodes)
if nodesPtr == nil {
return errors.New("node map not initialized")
}
nodes := *(*map[string]*Node)(nodesPtr)
// Собираем онлайн узлы
onlineNodes := make([]string, 0)
for id, node := range nodes {
if node.GetState() == StateOnline {
onlineNodes = append(onlineNodes, id)
}
}
if len(onlineNodes) == 0 {
return errors.New("no online nodes available for rebalance")
}
// Если включён шардинг, ребалансируем шарды
if cm.shardManager.enabled {
err := cm.shardManager.Rebalance()
if err != nil {
return err
}
}
atomic.AddInt64(&cm.stats.rebalancingCnt, 1)
printSuccess("Cluster rebalance completed successfully")
return nil
}