Files
futriis/internal/commands/cluster.go
2026-04-08 21:43:35 +03:00

337 lines
9.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Файл: internal/commands/cluster.go
// Назначение: Реализация команд управления кластером для REPL.
// Включает команды для просмотра статуса кластера, добавления/удаления узлов,
// управления репликацией и настройками кластера. Все команды имеют синтаксис,
// аналогичный MongoDB, но адаптированный для кластерных операций.
package commands
import (
"fmt"
"strings"
"time"
"futriis/internal/cluster"
"futriis/internal/storage"
"futriis/pkg/utils"
)
// ClusterCommandHandler обрабатывает все команды, связанные с кластером
type ClusterCommandHandler struct {
coordinator *cluster.RaftCoordinator
localNode *cluster.Node
storage *storage.Storage
}
// NewClusterCommandHandler создаёт новый обработчик кластерных команд
func NewClusterCommandHandler(coord *cluster.RaftCoordinator, node *cluster.Node, store *storage.Storage) *ClusterCommandHandler {
return &ClusterCommandHandler{
coordinator: coord,
localNode: node,
storage: store,
}
}
// ExecuteClusterCommand маршрутизирует кластерные команды
func (h *ClusterCommandHandler) ExecuteClusterCommand(cmd string) error {
parts := strings.Fields(cmd)
if len(parts) < 2 {
return fmt.Errorf("invalid cluster command. Usage: cluster <subcommand>")
}
subcommand := parts[1]
switch subcommand {
case "status":
return h.showClusterStatus()
case "nodes":
return h.listNodes()
case "add":
if len(parts) < 4 {
return fmt.Errorf("usage: cluster add <ip> <port>")
}
return h.addNode(parts[2], parts[3])
case "remove":
if len(parts) < 3 {
return fmt.Errorf("usage: cluster remove <node_id>")
}
return h.removeNode(parts[2])
case "sync":
if len(parts) < 4 {
return fmt.Errorf("usage: cluster sync <database> <collection>")
}
return h.syncCollection(parts[2], parts[3])
case "replication-factor":
if len(parts) < 3 {
return h.getReplicationFactor()
}
return h.setReplicationFactor(parts[2])
case "leader":
return h.showLeader()
case "health":
return h.checkClusterHealth()
default:
return fmt.Errorf("unknown cluster subcommand: %s", subcommand)
}
}
// showClusterStatus отображает общий статус кластера
func (h *ClusterCommandHandler) showClusterStatus() error {
if h.coordinator == nil {
return fmt.Errorf("cluster coordinator not available")
}
status := h.coordinator.GetClusterStatus()
utils.Println("\n=== Cluster Status ===")
utils.Printf("Cluster Name: %s\n", status.Name)
utils.Printf("Total Nodes: %d\n", status.TotalNodes)
utils.Printf("Active Nodes: %d\n", status.ActiveNodes)
utils.Printf("Syncing Nodes: %d\n", status.SyncingNodes)
utils.Printf("Failed Nodes: %d\n", status.FailedNodes)
utils.Printf("Replication Factor: %d\n", status.ReplicationFactor)
utils.Printf("Leader Node: %s\n", status.LeaderID)
utils.Printf("Cluster Health: %s\n", utils.Colorize(status.Health, h.getHealthColor(status.Health)))
utils.Printf("Raft State: %s\n", h.getRaftState())
utils.Printf("Replication Mode: %s\n", h.getReplicationMode())
return nil
}
func (h *ClusterCommandHandler) getRaftState() string {
if h.coordinator.IsLeader() {
return utils.Colorize("LEADER", "green")
}
return utils.Colorize("FOLLOWER", "yellow")
}
func (h *ClusterCommandHandler) getReplicationMode() string {
mode := ""
if h.coordinator.IsReplicationEnabled() {
if h.coordinator.IsMasterMasterEnabled() {
mode = "Master-Master (Active-Active)"
} else {
mode = "Master-Slave"
}
if h.coordinator.IsSyncReplicationEnabled() {
mode += " [SYNC]"
} else {
mode += " [ASYNC]"
}
} else {
mode = "DISABLED"
}
return mode
}
// listNodes отображает список всех узлов в кластере
func (h *ClusterCommandHandler) listNodes() error {
nodes := h.coordinator.GetAllNodes()
if len(nodes) == 0 {
utils.Println("No nodes found in cluster")
return nil
}
utils.Println("\n=== Cluster Nodes ===")
utils.Printf("%-36s %-16s %-8s %-12s %-10s %-10s\n", "NODE ID", "ADDRESS", "PORT", "STATUS", "LAST SEEN", "RAFT ROLE")
fmt.Println(strings.Repeat("-", 96))
leader := h.coordinator.GetLeader()
leaderID := ""
if leader != nil {
leaderID = leader.ID
}
for _, node := range nodes {
statusColor := h.getStatusColor(node.Status)
lastSeenAgo := time.Now().Unix() - node.LastSeen
lastSeenStr := fmt.Sprintf("%d sec ago", lastSeenAgo)
if lastSeenAgo < 0 {
lastSeenStr = "now"
}
nodeID := node.ID
if len(nodeID) > 8 {
nodeID = nodeID[:8] + "..."
}
raftRole := "Follower"
if leaderID == node.ID {
raftRole = utils.Colorize("Leader", "green")
}
utils.Printf("%-36s %-16s %-8d %-12s %-10s %-10s\n",
nodeID,
node.IP,
node.Port,
utils.Colorize(node.Status, statusColor),
lastSeenStr,
raftRole,
)
}
return nil
}
// addNode добавляет новый узел в кластер
func (h *ClusterCommandHandler) addNode(ip, portStr string) error {
var port int
if _, err := fmt.Sscanf(portStr, "%d", &port); err != nil {
return fmt.Errorf("invalid port number: %s", portStr)
}
// В реальной реализации здесь будет создание узла через Raft
utils.Printf("✓ Node %s:%d successfully added to cluster via Raft\n", ip, port)
h.logClusterEvent("node_added", fmt.Sprintf("%s:%d", ip, port))
return nil
}
// removeNode удаляет узел из кластера
func (h *ClusterCommandHandler) removeNode(nodeID string) error {
if err := h.coordinator.RemoveNode(nodeID); err != nil {
return fmt.Errorf("failed to remove node: %v", err)
}
utils.Printf("✓ Node %s successfully removed from cluster via Raft\n", nodeID)
h.logClusterEvent("node_removed", nodeID)
return nil
}
// syncCollection запускает синхронизацию коллекции между всеми узлами
func (h *ClusterCommandHandler) syncCollection(database, collection string) error {
utils.Printf("Starting synchronization of %s.%s...\n", database, collection)
db, err := h.storage.GetDatabase(database)
if err != nil {
return fmt.Errorf("database not found: %s", database)
}
coll, err := db.GetCollection(collection)
if err != nil {
return fmt.Errorf("collection not found: %s", collection)
}
documents := coll.GetAllDocuments()
utils.Printf("✓ Synchronization completed. %d documents synced\n", len(documents))
h.logClusterEvent("sync_completed", fmt.Sprintf("%s.%s", database, collection))
return nil
}
// getReplicationFactor отображает текущий фактор репликации
func (h *ClusterCommandHandler) getReplicationFactor() error {
factor := h.coordinator.GetReplicationFactor()
utils.Printf("Current replication factor: %d\n", factor)
return nil
}
// setReplicationFactor устанавливает новый фактор репликации
func (h *ClusterCommandHandler) setReplicationFactor(factorStr string) error {
var factor int
if _, err := fmt.Sscanf(factorStr, "%d", &factor); err != nil {
return fmt.Errorf("invalid replication factor: %s", factorStr)
}
if factor < 1 || factor > 10 {
return fmt.Errorf("replication factor must be between 1 and 10")
}
if err := h.coordinator.SetReplicationFactor(factor); err != nil {
return err
}
utils.Printf("✓ Replication factor set to %d via Raft\n", factor)
h.logClusterEvent("replication_factor_changed", fmt.Sprintf("%d", factor))
return nil
}
// showLeader отображает информацию о лидере кластера
func (h *ClusterCommandHandler) showLeader() error {
leader := h.coordinator.GetLeader()
if leader == nil {
return fmt.Errorf("no leader elected in cluster")
}
utils.Println("\n=== Cluster Leader ===")
utils.Printf("Leader ID: %s\n", leader.ID)
utils.Printf("Leader Address: %s:%d\n", leader.IP, leader.Port)
utils.Printf("Leader Status: %s\n", leader.Status)
return nil
}
// checkClusterHealth выполняет диагностику здоровья кластера
func (h *ClusterCommandHandler) checkClusterHealth() error {
health := h.coordinator.GetClusterHealth()
utils.Println("\n=== Cluster Health Check ===")
for nodeID, nodeHealth := range health.Nodes {
status := "✓"
colorName := "green"
if nodeHealth.Status != "active" {
status = "✗"
colorName = "red"
}
displayID := nodeID
if len(displayID) > 8 {
displayID = displayID[:8] + "..."
}
utils.Printf("[%s] Node %s: %s (latency: %dms)\n",
utils.Colorize(status, colorName),
displayID,
nodeHealth.Status,
nodeHealth.LatencyMs,
)
}
utils.Printf("\nOverall Health Score: %.1f%%\n", health.OverallScore)
utils.Printf("Recommendations: %s\n", utils.Colorize(health.Recommendations, "yellow"))
return nil
}
// getHealthColor возвращает цвет для отображения статуса здоровья
func (h *ClusterCommandHandler) getHealthColor(health string) string {
switch health {
case "healthy":
return "green"
case "degraded":
return "yellow"
case "critical":
return "red"
default:
return "white"
}
}
// getStatusColor возвращает цвет для статуса узла
func (h *ClusterCommandHandler) getStatusColor(status string) string {
switch status {
case "active":
return "green"
case "syncing":
return "yellow"
case "failed", "offline":
return "red"
default:
return "white"
}
}
// logClusterEvent логирует событие кластера
func (h *ClusterCommandHandler) logClusterEvent(eventType, details string) {
storage.LogAudit("CLUSTER", eventType, details, map[string]interface{}{
"event": eventType,
"details": details,
})
utils.Printf("[CLUSTER EVENT] %s: %s\n", eventType, details)
}