// /futriis/internal/cluster/node.go // Пакет cluster реализует управление кластером, координацию узлов и репликацию данных. // Обеспечивает обнаружение узлов, heartbeat механизм для мониторинга доступности, // а также синхронную мастер-мастер репликацию между узлами кластера. // Поддерживает автоматическое переключение ролей и балансировку нагрузки. package cluster import ( "encoding/json" "errors" "fmt" "net" "sync" "sync/atomic" "time" "unsafe" "futriis/pkg/config" ) // Простые функции для логирования (без зависимостей) func printInfo(format string, args ...interface{}) { fmt.Printf("\033[34m[INFO]\033[0m %s\n", fmt.Sprintf(format, args...)) } func printSuccess(format string, args ...interface{}) { fmt.Printf("\033[32m[OK]\033[0m %s\n", fmt.Sprintf(format, args...)) } func printWarning(format string, args ...interface{}) { fmt.Printf("\033[33m[WARN]\033[0m %s\n", fmt.Sprintf(format, args...)) } func printError(format string, args ...interface{}) { fmt.Printf("\033[31m[ERROR]\033[0m %s\n", fmt.Sprintf(format, args...)) } // NodeState состояние узла type NodeState int const ( StateOffline NodeState = iota StateJoining StateOnline StateLeaving StateFailed ) func (s NodeState) String() string { switch s { case StateOffline: return "offline" case StateJoining: return "joining" case StateOnline: return "online" case StateLeaving: return "leaving" case StateFailed: return "failed" default: return "unknown" } } // ReplicationMessage представляет сообщение репликации type ReplicationMessage struct { Type string `json:"type"` // "write", "sync", "ack" Command string `json:"command"` // Команда (create, update, delete) Args []interface{} `json:"args"` // Аргументы команды Timestamp int64 `json:"timestamp"` // Временная метка NodeID string `json:"node_id"` // ID исходного узла ShardID string `json:"shard_id"` // ID шарда } // Node представляет узел кластера (с wait-free указателями) type Node struct { ID string Address string state int32 // Атомарное состояние LastSeen time.Time ShardIDs []string // Используем atomic.Value для wait-free доступа к изменяемым полям nodeState unsafe.Pointer // Атомарный указатель на карту состояний } // GetState атомарно получает состояние узла func (n *Node) GetState() NodeState { return NodeState(atomic.LoadInt32(&n.state)) } // SetState атомарно устанавливает состояние узла func (n *Node) SetState(state NodeState) { atomic.StoreInt32(&n.state, int32(state)) } // ClusterManager управляет кластером (с wait-free операциями) type ClusterManager struct { nodes unsafe.Pointer // Атомарный указатель на карту узлов coordinatorAddr string nodeID string localAddr string isCoordinator int32 // Атомарный флаг heartbeatStop chan struct{} replicationStop chan struct{} stats struct { totalNodes int64 activeNodes int64 rebalancingCnt int64 } replicationEnabled bool masterMaster bool replicationQueue chan ReplicationMessage replicationWG sync.WaitGroup shardManager *ShardManager // Менеджер шардинга } // NewClusterManager создаёт новый менеджер кластера func NewClusterManager(cfg *config.Config) *ClusterManager { // Определяем стратегию шардинга по умолчанию shardingStrategy := ConsistentHashing shardingEnabled := false initialShards := 10 // Проверяем наличие настроек шардинга в конфигурации // Используем значения по умолчанию, если поля отсутствуют cm := &ClusterManager{ coordinatorAddr: cfg.Cluster.CoordinatorAddress, nodeID: cfg.Node.ID, localAddr: cfg.Node.Address, heartbeatStop: make(chan struct{}), replicationStop: make(chan struct{}), replicationEnabled: cfg.Replication.Enabled, masterMaster: cfg.Replication.MasterMaster, replicationQueue: make(chan ReplicationMessage, 1000), } // Создаём менеджер шардинга с значениями по умолчанию cm.shardManager = NewShardManager( shardingStrategy, cfg.Cluster.ReplicationFactor, shardingEnabled, ) // Устанавливаем флаг координатора атомарно isCoord := int32(0) if cfg.Node.Address == cfg.Cluster.CoordinatorAddress { isCoord = 1 } atomic.StoreInt32(&cm.isCoordinator, isCoord) // Создаём начальную карту узлов nodes := make(map[string]*Node) // Добавляем себя в кластер selfNode := &Node{ ID: cfg.Node.ID, Address: cfg.Node.Address, LastSeen: time.Now(), } selfNode.SetState(StateOnline) nodes[cfg.Node.ID] = selfNode // Атомарно сохраняем карту узлов atomic.StorePointer(&cm.nodes, unsafe.Pointer(&nodes)) atomic.AddInt64(&cm.stats.totalNodes, 1) atomic.AddInt64(&cm.stats.activeNodes, 1) // Создаём начальные шарды если включён шардинг if shardingEnabled { // Получаем список всех узлов allNodes := make([]string, 0) for id := range nodes { allNodes = append(allNodes, id) } // Создаём шарды cm.shardManager.CreateShards(initialShards, allNodes) } // Запускаем обработчик репликации если включена мастер-мастер if cm.replicationEnabled && cm.masterMaster { go cm.startReplicationHandler() } return cm } // IsCoordinator атомарно проверяет, является ли узел координатором func (cm *ClusterManager) IsCoordinator() bool { return atomic.LoadInt32(&cm.isCoordinator) == 1 } // Start запускает кластерные сервисы func (cm *ClusterManager) Start() error { if cm.IsCoordinator() { // Запускаем координатор go cm.startCoordinator() } // Запускаем heartbeat go cm.heartbeatLoop() printInfo("Кластерный менеджер запущен") if cm.replicationEnabled && cm.masterMaster { printInfo("Мастер-мастер репликация активирована") } if cm.shardManager.enabled { printInfo("Шардинг активирован, стратегия: %v", cm.shardManager.getStrategyName()) } return nil } // Stop останавливает кластерные сервисы func (cm *ClusterManager) Stop() { close(cm.heartbeatStop) if cm.replicationEnabled && cm.masterMaster { close(cm.replicationStop) cm.replicationWG.Wait() } } // ReplicateCommand реплицирует команду на все узлы кластера с учётом шардинга func (cm *ClusterManager) ReplicateCommand(cmd string, args []interface{}, key string) error { if !cm.replicationEnabled || !cm.masterMaster { return nil // Репликация не включена } // Определяем шард для ключа var shardID string if cm.shardManager.enabled && key != "" { shard, err := cm.shardManager.GetShardForKey(key) if err == nil && shard != nil { shardID = shard.ID cm.shardManager.RecordWrite(shardID) } } msg := ReplicationMessage{ Type: "write", Command: cmd, Args: args, Timestamp: time.Now().UnixNano(), NodeID: cm.nodeID, ShardID: shardID, } // Отправляем в очередь репликации select { case cm.replicationQueue <- msg: return nil default: return errors.New("очередь репликации переполнена") } } // startReplicationHandler запускает обработчик репликации func (cm *ClusterManager) startReplicationHandler() { cm.replicationWG.Add(1) defer cm.replicationWG.Done() ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case msg := <-cm.replicationQueue: cm.sendReplicationMessage(msg) case <-ticker.C: cm.sendSyncRequest() case <-cm.replicationStop: return } } } // sendReplicationMessage отправляет сообщение репликации на другие узлы func (cm *ClusterManager) sendReplicationMessage(msg ReplicationMessage) { // Получаем текущую карту узлов атомарно nodesPtr := atomic.LoadPointer(&cm.nodes) if nodesPtr == nil { return } nodes := *(*map[string]*Node)(nodesPtr) for id, node := range nodes { if id == cm.nodeID || node.GetState() != StateOnline { continue // Не отправляем себе и офлайн узлам } // Если есть шард, проверяем, должен ли узел получать это сообщение if msg.ShardID != "" && cm.shardManager.enabled { shard, exists := cm.shardManager.GetShardByID(msg.ShardID) if exists { shouldSend := false for _, nodeID := range shard.Nodes { if nodeID == id { shouldSend = true break } } if !shouldSend { continue } } } go func(targetNode *Node) { conn, err := net.DialTimeout("tcp", targetNode.Address, 3*time.Second) if err != nil { printWarning("Не удалось подключиться к узлу %s для репликации: %v", targetNode.ID, err) return } defer conn.Close() encoder := json.NewEncoder(conn) if err := encoder.Encode(msg); err != nil { printWarning("Ошибка отправки репликации на узел %s: %v", targetNode.ID, err) return } // Ожидаем подтверждение для синхронной репликации if msg.Type == "write" { var ack map[string]interface{} decoder := json.NewDecoder(conn) if err := decoder.Decode(&ack); err == nil { if status, ok := ack["status"].(string); ok && status == "ok" { printSuccess("Репликация на узел %s подтверждена", targetNode.ID) } } } }(node) } } // sendSyncRequest отправляет запрос синхронизации func (cm *ClusterManager) sendSyncRequest() { if !cm.IsCoordinator() { return // Только координатор инициирует синхронизацию } msg := ReplicationMessage{ Type: "sync", Timestamp: time.Now().UnixNano(), NodeID: cm.nodeID, } cm.sendReplicationMessage(msg) } // handleReplicationMessage обрабатывает входящее сообщение репликации func (cm *ClusterManager) handleReplicationMessage(conn net.Conn) { defer conn.Close() var msg ReplicationMessage decoder := json.NewDecoder(conn) if err := decoder.Decode(&msg); err != nil { return } switch msg.Type { case "write": // Получаем команду от другого узла printInfo("Получена команда репликации: %s от узла %s (шард: %s)", msg.Command, msg.NodeID, msg.ShardID) // Здесь будет вызов движка для выполнения команды // TODO: Интегрировать с engine для выполнения реплицированных команд // Отправляем подтверждение encoder := json.NewEncoder(conn) encoder.Encode(map[string]interface{}{ "status": "ok", "time": time.Now().UnixNano(), }) case "sync": // Запрос синхронизации printInfo("Получен запрос синхронизации от узла %s", msg.NodeID) // TODO: Отправить текущее состояние case "ack": // Подтверждение получения printInfo("Получено подтверждение от узла %s", msg.NodeID) } } // heartbeatLoop отправляет heartbeat сигналы func (cm *ClusterManager) heartbeatLoop() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: cm.sendHeartbeat() case <-cm.heartbeatStop: return } } } // sendHeartbeat отправляет heartbeat координатору func (cm *ClusterManager) sendHeartbeat() { if !cm.IsCoordinator() { // Отправляем heartbeat координатору conn, err := net.DialTimeout("tcp", cm.coordinatorAddr, 3*time.Second) if err != nil { return } defer conn.Close() heartbeat := map[string]interface{}{ "type": "heartbeat", "node_id": cm.nodeID, "address": cm.localAddr, "time": time.Now().Unix(), } json.NewEncoder(conn).Encode(heartbeat) } } // startCoordinator запускает координатор кластера func (cm *ClusterManager) startCoordinator() { listener, err := net.Listen("tcp", cm.coordinatorAddr) if err != nil { printError("Ошибка запуска координатора: %v", err) return } defer listener.Close() printInfo("Координатор кластера запущен на " + cm.coordinatorAddr) for { conn, err := listener.Accept() if err != nil { continue } go cm.handleCoordinatorRequest(conn) } } // handleCoordinatorRequest обрабатывает запросы к координатору func (cm *ClusterManager) handleCoordinatorRequest(conn net.Conn) { defer conn.Close() var req map[string]interface{} if err := json.NewDecoder(conn).Decode(&req); err != nil { return } msgType, _ := req["type"].(string) switch msgType { case "heartbeat": nodeID, _ := req["node_id"].(string) address, _ := req["address"].(string) cm.updateNodeHeartbeat(nodeID, address) case "join": nodeID, _ := req["node_id"].(string) address, _ := req["address"].(string) cm.handleNodeJoin(nodeID, address) case "leave": nodeID, _ := req["node_id"].(string) cm.handleNodeLeave(nodeID) case "replication": // Обработка сообщения репликации на координаторе cm.handleReplicationMessage(conn) } } // handleNodeJoin обрабатывает присоединение узла func (cm *ClusterManager) handleNodeJoin(nodeID, address string) { // Получаем текущую карту узлов nodesPtr := atomic.LoadPointer(&cm.nodes) if nodesPtr == nil { return } oldNodes := *(*map[string]*Node)(nodesPtr) // Создаём новую карту newNodes := make(map[string]*Node) for k, v := range oldNodes { newNodes[k] = v } if node, exists := newNodes[nodeID]; exists { node.SetState(StateOnline) node.LastSeen = time.Now() } else { newNode := &Node{ ID: nodeID, Address: address, LastSeen: time.Now(), } newNode.SetState(StateOnline) newNodes[nodeID] = newNode atomic.AddInt64(&cm.stats.totalNodes, 1) } // Атомарно обновляем карту узлов atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes)) atomic.AddInt64(&cm.stats.activeNodes, 1) printSuccess("Узел %s (%s) присоединился к кластеру", nodeID, address) // Если включена мастер-мастер репликация, отправляем текущее состояние новому узлу if cm.replicationEnabled && cm.masterMaster { go cm.sendInitialSync(nodeID) } // Если включён шардинг, обновляем распределение шардов if cm.shardManager.enabled { cm.rebalanceShards() } } // sendInitialSync отправляет начальную синхронизацию новому узлу func (cm *ClusterManager) sendInitialSync(targetNodeID string) { // Получаем текущую карту узлов nodesPtr := atomic.LoadPointer(&cm.nodes) if nodesPtr == nil { return } nodes := *(*map[string]*Node)(nodesPtr) targetNode, exists := nodes[targetNodeID] if !exists || targetNode.GetState() != StateOnline { return } // TODO: Отправить текущее состояние базы данных новому узлу printInfo("Отправка начальной синхронизации узлу %s", targetNodeID) } // handleNodeLeave обрабатывает отключение узла func (cm *ClusterManager) handleNodeLeave(nodeID string) { // Получаем текущую карту узлов nodesPtr := atomic.LoadPointer(&cm.nodes) if nodesPtr == nil { return } oldNodes := *(*map[string]*Node)(nodesPtr) // Создаём новую карту newNodes := make(map[string]*Node) for k, v := range oldNodes { if k == nodeID { v.SetState(StateOffline) newNodes[k] = v } else { newNodes[k] = v } } // Атомарно обновляем карту узлов atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes)) atomic.AddInt64(&cm.stats.activeNodes, -1) printWarning("Узел %s покинул кластер", nodeID) // Если включён шардинг, обновляем распределение шардов if cm.shardManager.enabled { cm.rebalanceShards() } } // updateNodeHeartbeat обновляет время последнего heartbeat узла func (cm *ClusterManager) updateNodeHeartbeat(nodeID, address string) { // Получаем текущую карту узлов nodesPtr := atomic.LoadPointer(&cm.nodes) if nodesPtr == nil { return } oldNodes := *(*map[string]*Node)(nodesPtr) // Проверяем, нужно ли обновление node, exists := oldNodes[nodeID] if exists && node.GetState() == StateOnline && time.Since(node.LastSeen) < 5*time.Second { // Обновление не требуется, просто обновляем LastSeen в существующей карте node.LastSeen = time.Now() return } // Создаём новую карту для обновления newNodes := make(map[string]*Node) for k, v := range oldNodes { newNodes[k] = v } if exists { node.LastSeen = time.Now() if node.GetState() == StateOffline { node.SetState(StateOnline) atomic.AddInt64(&cm.stats.activeNodes, 1) } } else { newNode := &Node{ ID: nodeID, Address: address, LastSeen: time.Now(), } newNode.SetState(StateOnline) newNodes[nodeID] = newNode atomic.AddInt64(&cm.stats.totalNodes, 1) atomic.AddInt64(&cm.stats.activeNodes, 1) } // Атомарно обновляем карту узлов atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes)) } // rebalanceShards выполняет ребалансировку шардов при изменении состава кластера func (cm *ClusterManager) rebalanceShards() { if !cm.shardManager.enabled || !cm.IsCoordinator() { return } // Получаем текущие узлы nodesPtr := atomic.LoadPointer(&cm.nodes) if nodesPtr == nil { return } nodes := *(*map[string]*Node)(nodesPtr) // Собираем онлайн узлы onlineNodes := make([]string, 0) for id, node := range nodes { if node.GetState() == StateOnline { onlineNodes = append(onlineNodes, id) } } // Здесь должна быть логика ребалансировки шардов // В демо-версии просто перераспределяем существующие шарды cm.shardManager.Rebalance() atomic.AddInt64(&cm.stats.rebalancingCnt, 1) } // AddNode добавляет новый узел в кластер func (cm *ClusterManager) AddNode(address string) error { if !cm.IsCoordinator() { return errors.New("только координатор может добавлять узлы") } // Получаем текущую карту узлов nodesPtr := atomic.LoadPointer(&cm.nodes) if nodesPtr == nil { return errors.New("карта узлов не инициализирована") } oldNodes := *(*map[string]*Node)(nodesPtr) // Генерируем ID для нового узла nodeID := fmt.Sprintf("node-%d", len(oldNodes)+1) // Создаём новую карту newNodes := make(map[string]*Node) for k, v := range oldNodes { newNodes[k] = v } newNode := &Node{ ID: nodeID, Address: address, LastSeen: time.Now(), } newNode.SetState(StateJoining) newNodes[nodeID] = newNode // Атомарно обновляем карту узлов atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes)) atomic.AddInt64(&cm.stats.totalNodes, 1) printSuccess("Узел %s (%s) добавлен в кластер", nodeID, address) // Если включён шардинг, обновляем распределение if cm.shardManager.enabled { cm.rebalanceShards() } return nil } // RemoveNode удаляет узел из кластера func (cm *ClusterManager) RemoveNode(nodeID string) error { if !cm.IsCoordinator() { return errors.New("только координатор может удалять узлы") } // Получаем текущую карту узлов nodesPtr := atomic.LoadPointer(&cm.nodes) if nodesPtr == nil { return errors.New("карта узлов не инициализирована") } oldNodes := *(*map[string]*Node)(nodesPtr) if _, exists := oldNodes[nodeID]; !exists { return errors.New("узел не найден") } // Создаём новую карту без удаляемого узла newNodes := make(map[string]*Node) for k, v := range oldNodes { if k != nodeID { newNodes[k] = v } } // Атомарно обновляем карту узлов atomic.StorePointer(&cm.nodes, unsafe.Pointer(&newNodes)) atomic.AddInt64(&cm.stats.totalNodes, -1) printSuccess("Узел %s удален из кластера", nodeID) // Если включён шардинг, обновляем распределение if cm.shardManager.enabled { cm.rebalanceShards() } return nil } // GetClusterStatus возвращает статус кластера func (cm *ClusterManager) GetClusterStatus() map[string]interface{} { status := make(map[string]interface{}) status["total_nodes"] = atomic.LoadInt64(&cm.stats.totalNodes) status["active_nodes"] = atomic.LoadInt64(&cm.stats.activeNodes) status["is_coordinator"] = cm.IsCoordinator() status["coordinator"] = cm.coordinatorAddr status["replication_enabled"] = cm.replicationEnabled status["master_master"] = cm.masterMaster // Добавляем информацию о шардинге if cm.shardManager.enabled { shardStats := cm.shardManager.GetShardStats() status["sharding"] = shardStats } // Получаем текущую карту узлов атомарно nodesPtr := atomic.LoadPointer(&cm.nodes) if nodesPtr != nil { nodes := *(*map[string]*Node)(nodesPtr) nodesList := make([]map[string]interface{}, 0, len(nodes)) for id, node := range nodes { nodesList = append(nodesList, map[string]interface{}{ "id": id, "address": node.Address, "state": node.GetState().String(), "last_seen": node.LastSeen.Format(time.RFC3339), }) } status["nodes"] = nodesList } return status } // RebalanceCluster выполняет ребалансировку всего кластера func (cm *ClusterManager) RebalanceCluster() error { if !cm.IsCoordinator() { return errors.New("only coordinator can rebalance the cluster") } printInfo("Starting cluster rebalance...") // Получаем текущие узлы nodesPtr := atomic.LoadPointer(&cm.nodes) if nodesPtr == nil { return errors.New("node map not initialized") } nodes := *(*map[string]*Node)(nodesPtr) // Собираем онлайн узлы onlineNodes := make([]string, 0) for id, node := range nodes { if node.GetState() == StateOnline { onlineNodes = append(onlineNodes, id) } } if len(onlineNodes) == 0 { return errors.New("no online nodes available for rebalance") } // Если включён шардинг, ребалансируем шарды if cm.shardManager.enabled { err := cm.shardManager.Rebalance() if err != nil { return err } } atomic.AddInt64(&cm.stats.rebalancingCnt, 1) printSuccess("Cluster rebalance completed successfully") return nil }