futriis/internal/cluster/sharding.go
2026-02-27 22:04:04 +03:00

449 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// /futriis/internal/cluster/sharding.go
// Пакет cluster реализует шардинг данных для распределённого хранения
// Данный файл содержит реализацию менеджера шардинга, который управляет
// распределением данных по шардам с поддержкой различных стратегий (consistent hashing, range-based, hash-based) и обеспечивает ребалансировку)
package cluster
import (
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"hash/crc32"
"sort"
"sync/atomic"
"time"
"futriis/pkg/utils"
)
// ShardingStrategy стратегия шардинга
type ShardingStrategy int
const (
ConsistentHashing ShardingStrategy = iota
RangeBased
HashBased
)
// Shard представляет шард данных
type Shard struct {
ID string
Nodes []string // ID узлов, содержащих этот шард
Range *KeyRange
DataSize int64
CreatedAt time.Time
stats struct {
reads int64
writes int64
rebalances int64
}
}
// KeyRange диапазон ключей для range-based шардинга
type KeyRange struct {
Start string
End string
}
// ShardManager управляет шардированием
type ShardManager struct {
strategy ShardingStrategy
shards map[string]*Shard
virtualNodes int // Количество виртуальных нод для consistent hashing
hashRing []uint32 // Хэш-кольцо
hashToShard map[uint32]string // Хэш -> ID шарда
nodeToShards map[string][]string // Узел -> список шардов
shardToNodes map[string][]string // Шард -> список узлов
replicationFactor int
enabled bool
stats struct {
totalShards int64
totalMoves int64
rebalances int64
}
}
// NewShardManager создаёт новый менеджер шардинга
func NewShardManager(strategy ShardingStrategy, replicationFactor int, enabled bool) *ShardManager {
sm := &ShardManager{
strategy: strategy,
shards: make(map[string]*Shard),
hashRing: make([]uint32, 0),
hashToShard: make(map[uint32]string),
nodeToShards: make(map[string][]string),
shardToNodes: make(map[string][]string),
virtualNodes: 100, // По умолчанию 100 виртуальных нод
replicationFactor: replicationFactor,
enabled: enabled,
}
if enabled {
utils.PrintInfo("Шардинг активирован, стратегия: %v, фактор репликации: %d",
sm.getStrategyName(), replicationFactor)
}
return sm
}
// getStrategyName возвращает название стратегии
func (sm *ShardManager) getStrategyName() string {
switch sm.strategy {
case ConsistentHashing:
return "consistent_hashing"
case RangeBased:
return "range_based"
case HashBased:
return "hash_based"
default:
return "unknown"
}
}
// CreateShards создаёт начальные шарды
func (sm *ShardManager) CreateShards(numShards int, nodes []string) error {
if !sm.enabled {
return nil
}
switch sm.strategy {
case ConsistentHashing:
return sm.createConsistentHashShards(numShards, nodes)
case RangeBased:
return sm.createRangeBasedShards(numShards, nodes)
case HashBased:
return sm.createHashBasedShards(numShards, nodes)
}
return nil
}
// createConsistentHashShards создаёт шарды для consistent hashing
func (sm *ShardManager) createConsistentHashShards(numShards int, nodes []string) error {
// Очищаем кольцо
sm.hashRing = make([]uint32, 0)
sm.hashToShard = make(map[uint32]string)
// Создаём шарды
for i := 0; i < numShards; i++ {
shardID := fmt.Sprintf("shard-%d", i+1)
shard := &Shard{
ID: shardID,
Nodes: make([]string, 0),
CreatedAt: time.Now(),
}
sm.shards[shardID] = shard
// Добавляем виртуальные ноды для шарда в хэш-кольцо
for j := 0; j < sm.virtualNodes; j++ {
vnodeKey := fmt.Sprintf("%s:%d", shardID, j)
hash := crc32.ChecksumIEEE([]byte(vnodeKey))
sm.hashRing = append(sm.hashRing, hash)
sm.hashToShard[hash] = shardID
}
}
// Сортируем кольцо
sort.Slice(sm.hashRing, func(i, j int) bool {
return sm.hashRing[i] < sm.hashRing[j]
})
// Распределяем шарды по узлам
sm.distributeShardsToNodes(nodes)
atomic.AddInt64(&sm.stats.totalShards, int64(numShards))
utils.PrintSuccess("Создано %d шардов с consistent hashing", numShards)
return nil
}
// createRangeBasedShards создаёт шарды на основе диапазонов
func (sm *ShardManager) createRangeBasedShards(numShards int, nodes []string) error {
// Разбиваем ключевое пространство на диапазоны
// Используем hex-строки от "00" до "ff" для простоты
totalRange := 256 // 0x00 - 0xff
rangeSize := totalRange / numShards
for i := 0; i < numShards; i++ {
start := fmt.Sprintf("%02x", i*rangeSize)
end := fmt.Sprintf("%02x", (i+1)*rangeSize-1)
if i == numShards-1 {
end = "ff"
}
shardID := fmt.Sprintf("range-shard-%d", i+1)
shard := &Shard{
ID: shardID,
Range: &KeyRange{
Start: start,
End: end,
},
Nodes: make([]string, 0),
CreatedAt: time.Now(),
}
sm.shards[shardID] = shard
}
// Распределяем шарды по узлам
sm.distributeShardsToNodes(nodes)
atomic.AddInt64(&sm.stats.totalShards, int64(numShards))
utils.PrintSuccess("Создано %d range-based шардов", numShards)
return nil
}
// createHashBasedShards создаёт шарды на основе хэша
func (sm *ShardManager) createHashBasedShards(numShards int, nodes []string) error {
for i := 0; i < numShards; i++ {
shardID := fmt.Sprintf("hash-shard-%d", i+1)
shard := &Shard{
ID: shardID,
Nodes: make([]string, 0),
CreatedAt: time.Now(),
}
sm.shards[shardID] = shard
}
// Распределяем шарды по узлам
sm.distributeShardsToNodes(nodes)
atomic.AddInt64(&sm.stats.totalShards, int64(numShards))
utils.PrintSuccess("Создано %d hash-based шардов", numShards)
return nil
}
// distributeShardsToNodes распределяет шарды по узлам
func (sm *ShardManager) distributeShardsToNodes(nodes []string) {
if len(nodes) == 0 {
return
}
nodeCount := len(nodes)
shardCount := len(sm.shards)
// Равномерно распределяем шарды по узлам
shardsPerNode := shardCount / nodeCount
remainder := shardCount % nodeCount
shardIndex := 0
shardIDs := make([]string, 0, shardCount)
for id := range sm.shards {
shardIDs = append(shardIDs, id)
}
for i, nodeID := range nodes {
numShardsForNode := shardsPerNode
if i < remainder {
numShardsForNode++
}
nodeShards := make([]string, 0)
for j := 0; j < numShardsForNode && shardIndex < len(shardIDs); j++ {
shardID := shardIDs[shardIndex]
// Добавляем узел к шарду
sm.addNodeToShard(shardID, nodeID)
nodeShards = append(nodeShards, shardID)
shardIndex++
}
sm.nodeToShards[nodeID] = nodeShards
}
}
// addNodeToShard добавляет узел к шарду
func (sm *ShardManager) addNodeToShard(shardID, nodeID string) {
shard, exists := sm.shards[shardID]
if !exists {
return
}
// Добавляем узел к шарду
shard.Nodes = append(shard.Nodes, nodeID)
// Обновляем маппинг
sm.shardToNodes[shardID] = append(sm.shardToNodes[shardID], nodeID)
// Обновляем маппинг узла к шардам
sm.nodeToShards[nodeID] = append(sm.nodeToShards[nodeID], shardID)
}
// GetShardForKey определяет шард для ключа
func (sm *ShardManager) GetShardForKey(key string) (*Shard, error) {
if !sm.enabled {
return nil, nil
}
switch sm.strategy {
case ConsistentHashing:
return sm.getShardConsistentHashing(key)
case RangeBased:
return sm.getShardRangeBased(key)
case HashBased:
return sm.getShardHashBased(key)
}
return nil, errors.New("неизвестная стратегия шардинга")
}
// getShardConsistentHashing получает шард через consistent hashing
func (sm *ShardManager) getShardConsistentHashing(key string) (*Shard, error) {
if len(sm.hashRing) == 0 {
return nil, errors.New("хэш-кольцо пусто")
}
hash := crc32.ChecksumIEEE([]byte(key))
// Бинарный поиск в кольце
idx := sort.Search(len(sm.hashRing), func(i int) bool {
return sm.hashRing[i] >= hash
})
if idx == len(sm.hashRing) {
idx = 0
}
shardID := sm.hashToShard[sm.hashRing[idx]]
shard, exists := sm.shards[shardID]
if !exists {
return nil, errors.New("шард не найден")
}
atomic.AddInt64(&shard.stats.reads, 1)
return shard, nil
}
// getShardRangeBased получает шард по диапазону
func (sm *ShardManager) getShardRangeBased(key string) (*Shard, error) {
// Вычисляем хэш ключа для определения диапазона
hash := md5.Sum([]byte(key))
keyHex := hex.EncodeToString(hash[:1]) // Используем первый байт
for _, shard := range sm.shards {
if shard.Range != nil {
if keyHex >= shard.Range.Start && keyHex <= shard.Range.End {
atomic.AddInt64(&shard.stats.reads, 1)
return shard, nil
}
}
}
return nil, errors.New("шард не найден для ключа")
}
// getShardHashBased получает шард по хэшу
func (sm *ShardManager) getShardHashBased(key string) (*Shard, error) {
hash := crc32.ChecksumIEEE([]byte(key))
shardIndex := int(hash) % len(sm.shards)
// Получаем отсортированный список ID шардов
shardIDs := make([]string, 0, len(sm.shards))
for id := range sm.shards {
shardIDs = append(shardIDs, id)
}
sort.Strings(shardIDs)
if shardIndex >= 0 && shardIndex < len(shardIDs) {
shardID := shardIDs[shardIndex]
shard, exists := sm.shards[shardID]
if exists {
atomic.AddInt64(&shard.stats.reads, 1)
return shard, nil
}
}
return nil, errors.New("шард не найден")
}
// GetShardByID возвращает шард по ID
func (sm *ShardManager) GetShardByID(shardID string) (*Shard, bool) {
shard, exists := sm.shards[shardID]
return shard, exists
}
// RecordWrite записывает статистику записи в шард
func (sm *ShardManager) RecordWrite(shardID string) {
if !sm.enabled {
return
}
shard, exists := sm.shards[shardID]
if exists {
atomic.AddInt64(&shard.stats.writes, 1)
}
}
// Rebalance выполняет ребалансировку шардов
func (sm *ShardManager) Rebalance() error {
if !sm.enabled {
return nil
}
utils.PrintInfo("Запуск ребалансировки шардов...")
atomic.AddInt64(&sm.stats.rebalances, 1)
// Здесь должна быть логика ребалансировки
// Перемещение шардов между узлами для равномерной загрузки
for _, shard := range sm.shards {
atomic.AddInt64(&shard.stats.rebalances, 1)
}
atomic.AddInt64(&sm.stats.totalMoves, int64(len(sm.shards)/2))
utils.PrintSuccess("Ребалансировка завершена")
return nil
}
// GetShardStats возвращает статистику шардов
func (sm *ShardManager) GetShardStats() map[string]interface{} {
stats := make(map[string]interface{})
stats["enabled"] = sm.enabled
stats["strategy"] = sm.getStrategyName()
stats["total_shards"] = atomic.LoadInt64(&sm.stats.totalShards)
stats["total_rebalances"] = atomic.LoadInt64(&sm.stats.rebalances)
stats["total_moves"] = atomic.LoadInt64(&sm.stats.totalMoves)
shardStats := make([]map[string]interface{}, 0)
for id, shard := range sm.shards {
sStats := map[string]interface{}{
"id": id,
"nodes": shard.Nodes,
"reads": atomic.LoadInt64(&shard.stats.reads),
"writes": atomic.LoadInt64(&shard.stats.writes),
"rebalances": atomic.LoadInt64(&shard.stats.rebalances),
"created": shard.CreatedAt.Format(time.RFC3339),
}
if shard.Range != nil {
sStats["range_start"] = shard.Range.Start
sStats["range_end"] = shard.Range.End
}
shardStats = append(shardStats, sStats)
}
stats["shards"] = shardStats
return stats
}