// /futriis/internal/cluster/sharding.go // Пакет cluster реализует шардинг данных для распределённого хранения // Данный файл содержит реализацию менеджера шардинга, который управляет // распределением данных по шардам с поддержкой различных стратегий (consistent hashing, range-based, hash-based) и обеспечивает ребалансировку) package cluster import ( "crypto/md5" "encoding/hex" "errors" "fmt" "hash/crc32" "sort" "sync/atomic" "time" "futriis/pkg/utils" ) // ShardingStrategy стратегия шардинга type ShardingStrategy int const ( ConsistentHashing ShardingStrategy = iota RangeBased HashBased ) // Shard представляет шард данных type Shard struct { ID string Nodes []string // ID узлов, содержащих этот шард Range *KeyRange DataSize int64 CreatedAt time.Time stats struct { reads int64 writes int64 rebalances int64 } } // KeyRange диапазон ключей для range-based шардинга type KeyRange struct { Start string End string } // ShardManager управляет шардированием type ShardManager struct { strategy ShardingStrategy shards map[string]*Shard virtualNodes int // Количество виртуальных нод для consistent hashing hashRing []uint32 // Хэш-кольцо hashToShard map[uint32]string // Хэш -> ID шарда nodeToShards map[string][]string // Узел -> список шардов shardToNodes map[string][]string // Шард -> список узлов replicationFactor int enabled bool stats struct { totalShards int64 totalMoves int64 rebalances int64 } } // NewShardManager создаёт новый менеджер шардинга func NewShardManager(strategy ShardingStrategy, replicationFactor int, enabled bool) *ShardManager { sm := &ShardManager{ strategy: strategy, shards: make(map[string]*Shard), hashRing: make([]uint32, 0), hashToShard: make(map[uint32]string), nodeToShards: make(map[string][]string), shardToNodes: make(map[string][]string), virtualNodes: 100, // По умолчанию 100 виртуальных нод replicationFactor: replicationFactor, enabled: enabled, } if enabled { utils.PrintInfo("Шардинг активирован, стратегия: %v, фактор репликации: %d", sm.getStrategyName(), replicationFactor) } return sm } // getStrategyName возвращает название стратегии func (sm *ShardManager) getStrategyName() string { switch sm.strategy { case ConsistentHashing: return "consistent_hashing" case RangeBased: return "range_based" case HashBased: return "hash_based" default: return "unknown" } } // CreateShards создаёт начальные шарды func (sm *ShardManager) CreateShards(numShards int, nodes []string) error { if !sm.enabled { return nil } switch sm.strategy { case ConsistentHashing: return sm.createConsistentHashShards(numShards, nodes) case RangeBased: return sm.createRangeBasedShards(numShards, nodes) case HashBased: return sm.createHashBasedShards(numShards, nodes) } return nil } // createConsistentHashShards создаёт шарды для consistent hashing func (sm *ShardManager) createConsistentHashShards(numShards int, nodes []string) error { // Очищаем кольцо sm.hashRing = make([]uint32, 0) sm.hashToShard = make(map[uint32]string) // Создаём шарды for i := 0; i < numShards; i++ { shardID := fmt.Sprintf("shard-%d", i+1) shard := &Shard{ ID: shardID, Nodes: make([]string, 0), CreatedAt: time.Now(), } sm.shards[shardID] = shard // Добавляем виртуальные ноды для шарда в хэш-кольцо for j := 0; j < sm.virtualNodes; j++ { vnodeKey := fmt.Sprintf("%s:%d", shardID, j) hash := crc32.ChecksumIEEE([]byte(vnodeKey)) sm.hashRing = append(sm.hashRing, hash) sm.hashToShard[hash] = shardID } } // Сортируем кольцо sort.Slice(sm.hashRing, func(i, j int) bool { return sm.hashRing[i] < sm.hashRing[j] }) // Распределяем шарды по узлам sm.distributeShardsToNodes(nodes) atomic.AddInt64(&sm.stats.totalShards, int64(numShards)) utils.PrintSuccess("Создано %d шардов с consistent hashing", numShards) return nil } // createRangeBasedShards создаёт шарды на основе диапазонов func (sm *ShardManager) createRangeBasedShards(numShards int, nodes []string) error { // Разбиваем ключевое пространство на диапазоны // Используем hex-строки от "00" до "ff" для простоты totalRange := 256 // 0x00 - 0xff rangeSize := totalRange / numShards for i := 0; i < numShards; i++ { start := fmt.Sprintf("%02x", i*rangeSize) end := fmt.Sprintf("%02x", (i+1)*rangeSize-1) if i == numShards-1 { end = "ff" } shardID := fmt.Sprintf("range-shard-%d", i+1) shard := &Shard{ ID: shardID, Range: &KeyRange{ Start: start, End: end, }, Nodes: make([]string, 0), CreatedAt: time.Now(), } sm.shards[shardID] = shard } // Распределяем шарды по узлам sm.distributeShardsToNodes(nodes) atomic.AddInt64(&sm.stats.totalShards, int64(numShards)) utils.PrintSuccess("Создано %d range-based шардов", numShards) return nil } // createHashBasedShards создаёт шарды на основе хэша func (sm *ShardManager) createHashBasedShards(numShards int, nodes []string) error { for i := 0; i < numShards; i++ { shardID := fmt.Sprintf("hash-shard-%d", i+1) shard := &Shard{ ID: shardID, Nodes: make([]string, 0), CreatedAt: time.Now(), } sm.shards[shardID] = shard } // Распределяем шарды по узлам sm.distributeShardsToNodes(nodes) atomic.AddInt64(&sm.stats.totalShards, int64(numShards)) utils.PrintSuccess("Создано %d hash-based шардов", numShards) return nil } // distributeShardsToNodes распределяет шарды по узлам func (sm *ShardManager) distributeShardsToNodes(nodes []string) { if len(nodes) == 0 { return } nodeCount := len(nodes) shardCount := len(sm.shards) // Равномерно распределяем шарды по узлам shardsPerNode := shardCount / nodeCount remainder := shardCount % nodeCount shardIndex := 0 shardIDs := make([]string, 0, shardCount) for id := range sm.shards { shardIDs = append(shardIDs, id) } for i, nodeID := range nodes { numShardsForNode := shardsPerNode if i < remainder { numShardsForNode++ } nodeShards := make([]string, 0) for j := 0; j < numShardsForNode && shardIndex < len(shardIDs); j++ { shardID := shardIDs[shardIndex] // Добавляем узел к шарду sm.addNodeToShard(shardID, nodeID) nodeShards = append(nodeShards, shardID) shardIndex++ } sm.nodeToShards[nodeID] = nodeShards } } // addNodeToShard добавляет узел к шарду func (sm *ShardManager) addNodeToShard(shardID, nodeID string) { shard, exists := sm.shards[shardID] if !exists { return } // Добавляем узел к шарду shard.Nodes = append(shard.Nodes, nodeID) // Обновляем маппинг sm.shardToNodes[shardID] = append(sm.shardToNodes[shardID], nodeID) // Обновляем маппинг узла к шардам sm.nodeToShards[nodeID] = append(sm.nodeToShards[nodeID], shardID) } // GetShardForKey определяет шард для ключа func (sm *ShardManager) GetShardForKey(key string) (*Shard, error) { if !sm.enabled { return nil, nil } switch sm.strategy { case ConsistentHashing: return sm.getShardConsistentHashing(key) case RangeBased: return sm.getShardRangeBased(key) case HashBased: return sm.getShardHashBased(key) } return nil, errors.New("неизвестная стратегия шардинга") } // getShardConsistentHashing получает шард через consistent hashing func (sm *ShardManager) getShardConsistentHashing(key string) (*Shard, error) { if len(sm.hashRing) == 0 { return nil, errors.New("хэш-кольцо пусто") } hash := crc32.ChecksumIEEE([]byte(key)) // Бинарный поиск в кольце idx := sort.Search(len(sm.hashRing), func(i int) bool { return sm.hashRing[i] >= hash }) if idx == len(sm.hashRing) { idx = 0 } shardID := sm.hashToShard[sm.hashRing[idx]] shard, exists := sm.shards[shardID] if !exists { return nil, errors.New("шард не найден") } atomic.AddInt64(&shard.stats.reads, 1) return shard, nil } // getShardRangeBased получает шард по диапазону func (sm *ShardManager) getShardRangeBased(key string) (*Shard, error) { // Вычисляем хэш ключа для определения диапазона hash := md5.Sum([]byte(key)) keyHex := hex.EncodeToString(hash[:1]) // Используем первый байт for _, shard := range sm.shards { if shard.Range != nil { if keyHex >= shard.Range.Start && keyHex <= shard.Range.End { atomic.AddInt64(&shard.stats.reads, 1) return shard, nil } } } return nil, errors.New("шард не найден для ключа") } // getShardHashBased получает шард по хэшу func (sm *ShardManager) getShardHashBased(key string) (*Shard, error) { hash := crc32.ChecksumIEEE([]byte(key)) shardIndex := int(hash) % len(sm.shards) // Получаем отсортированный список ID шардов shardIDs := make([]string, 0, len(sm.shards)) for id := range sm.shards { shardIDs = append(shardIDs, id) } sort.Strings(shardIDs) if shardIndex >= 0 && shardIndex < len(shardIDs) { shardID := shardIDs[shardIndex] shard, exists := sm.shards[shardID] if exists { atomic.AddInt64(&shard.stats.reads, 1) return shard, nil } } return nil, errors.New("шард не найден") } // GetShardByID возвращает шард по ID func (sm *ShardManager) GetShardByID(shardID string) (*Shard, bool) { shard, exists := sm.shards[shardID] return shard, exists } // RecordWrite записывает статистику записи в шард func (sm *ShardManager) RecordWrite(shardID string) { if !sm.enabled { return } shard, exists := sm.shards[shardID] if exists { atomic.AddInt64(&shard.stats.writes, 1) } } // Rebalance выполняет ребалансировку шардов func (sm *ShardManager) Rebalance() error { if !sm.enabled { return nil } utils.PrintInfo("Запуск ребалансировки шардов...") atomic.AddInt64(&sm.stats.rebalances, 1) // Здесь должна быть логика ребалансировки // Перемещение шардов между узлами для равномерной загрузки for _, shard := range sm.shards { atomic.AddInt64(&shard.stats.rebalances, 1) } atomic.AddInt64(&sm.stats.totalMoves, int64(len(sm.shards)/2)) utils.PrintSuccess("Ребалансировка завершена") return nil } // GetShardStats возвращает статистику шардов func (sm *ShardManager) GetShardStats() map[string]interface{} { stats := make(map[string]interface{}) stats["enabled"] = sm.enabled stats["strategy"] = sm.getStrategyName() stats["total_shards"] = atomic.LoadInt64(&sm.stats.totalShards) stats["total_rebalances"] = atomic.LoadInt64(&sm.stats.rebalances) stats["total_moves"] = atomic.LoadInt64(&sm.stats.totalMoves) shardStats := make([]map[string]interface{}, 0) for id, shard := range sm.shards { sStats := map[string]interface{}{ "id": id, "nodes": shard.Nodes, "reads": atomic.LoadInt64(&shard.stats.reads), "writes": atomic.LoadInt64(&shard.stats.writes), "rebalances": atomic.LoadInt64(&shard.stats.rebalances), "created": shard.CreatedAt.Format(time.RFC3339), } if shard.Range != nil { sStats["range_start"] = shard.Range.Start sStats["range_end"] = shard.Range.End } shardStats = append(shardStats, sStats) } stats["shards"] = shardStats return stats }