futriis/internal/cluster/sharding.go

449 lines
12 KiB
Go
Raw Normal View History

2026-02-27 22:04:04 +03:00
// /futriis/internal/cluster/sharding.go
// Пакет cluster реализует шардинг данных для распределённого хранения
// Данный файл содержит реализацию менеджера шардинга, который управляет
// распределением данных по шардам с поддержкой различных стратегий (consistent hashing, range-based, hash-based) и обеспечивает ребалансировку)
package cluster
import (
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"hash/crc32"
"sort"
"sync/atomic"
"time"
"futriis/pkg/utils"
)
// ShardingStrategy стратегия шардинга
type ShardingStrategy int
const (
ConsistentHashing ShardingStrategy = iota
RangeBased
HashBased
)
// Shard представляет шард данных
type Shard struct {
ID string
Nodes []string // ID узлов, содержащих этот шард
Range *KeyRange
DataSize int64
CreatedAt time.Time
stats struct {
reads int64
writes int64
rebalances int64
}
}
// KeyRange диапазон ключей для range-based шардинга
type KeyRange struct {
Start string
End string
}
// ShardManager управляет шардированием
type ShardManager struct {
strategy ShardingStrategy
shards map[string]*Shard
virtualNodes int // Количество виртуальных нод для consistent hashing
hashRing []uint32 // Хэш-кольцо
hashToShard map[uint32]string // Хэш -> ID шарда
nodeToShards map[string][]string // Узел -> список шардов
shardToNodes map[string][]string // Шард -> список узлов
replicationFactor int
enabled bool
stats struct {
totalShards int64
totalMoves int64
rebalances int64
}
}
// NewShardManager создаёт новый менеджер шардинга
func NewShardManager(strategy ShardingStrategy, replicationFactor int, enabled bool) *ShardManager {
sm := &ShardManager{
strategy: strategy,
shards: make(map[string]*Shard),
hashRing: make([]uint32, 0),
hashToShard: make(map[uint32]string),
nodeToShards: make(map[string][]string),
shardToNodes: make(map[string][]string),
virtualNodes: 100, // По умолчанию 100 виртуальных нод
replicationFactor: replicationFactor,
enabled: enabled,
}
if enabled {
utils.PrintInfo("Шардинг активирован, стратегия: %v, фактор репликации: %d",
sm.getStrategyName(), replicationFactor)
}
return sm
}
// getStrategyName возвращает название стратегии
func (sm *ShardManager) getStrategyName() string {
switch sm.strategy {
case ConsistentHashing:
return "consistent_hashing"
case RangeBased:
return "range_based"
case HashBased:
return "hash_based"
default:
return "unknown"
}
}
// CreateShards создаёт начальные шарды
func (sm *ShardManager) CreateShards(numShards int, nodes []string) error {
if !sm.enabled {
return nil
}
switch sm.strategy {
case ConsistentHashing:
return sm.createConsistentHashShards(numShards, nodes)
case RangeBased:
return sm.createRangeBasedShards(numShards, nodes)
case HashBased:
return sm.createHashBasedShards(numShards, nodes)
}
return nil
}
// createConsistentHashShards создаёт шарды для consistent hashing
func (sm *ShardManager) createConsistentHashShards(numShards int, nodes []string) error {
// Очищаем кольцо
sm.hashRing = make([]uint32, 0)
sm.hashToShard = make(map[uint32]string)
// Создаём шарды
for i := 0; i < numShards; i++ {
shardID := fmt.Sprintf("shard-%d", i+1)
shard := &Shard{
ID: shardID,
Nodes: make([]string, 0),
CreatedAt: time.Now(),
}
sm.shards[shardID] = shard
// Добавляем виртуальные ноды для шарда в хэш-кольцо
for j := 0; j < sm.virtualNodes; j++ {
vnodeKey := fmt.Sprintf("%s:%d", shardID, j)
hash := crc32.ChecksumIEEE([]byte(vnodeKey))
sm.hashRing = append(sm.hashRing, hash)
sm.hashToShard[hash] = shardID
}
}
// Сортируем кольцо
sort.Slice(sm.hashRing, func(i, j int) bool {
return sm.hashRing[i] < sm.hashRing[j]
})
// Распределяем шарды по узлам
sm.distributeShardsToNodes(nodes)
atomic.AddInt64(&sm.stats.totalShards, int64(numShards))
utils.PrintSuccess("Создано %d шардов с consistent hashing", numShards)
return nil
}
// createRangeBasedShards создаёт шарды на основе диапазонов
func (sm *ShardManager) createRangeBasedShards(numShards int, nodes []string) error {
// Разбиваем ключевое пространство на диапазоны
// Используем hex-строки от "00" до "ff" для простоты
totalRange := 256 // 0x00 - 0xff
rangeSize := totalRange / numShards
for i := 0; i < numShards; i++ {
start := fmt.Sprintf("%02x", i*rangeSize)
end := fmt.Sprintf("%02x", (i+1)*rangeSize-1)
if i == numShards-1 {
end = "ff"
}
shardID := fmt.Sprintf("range-shard-%d", i+1)
shard := &Shard{
ID: shardID,
Range: &KeyRange{
Start: start,
End: end,
},
Nodes: make([]string, 0),
CreatedAt: time.Now(),
}
sm.shards[shardID] = shard
}
// Распределяем шарды по узлам
sm.distributeShardsToNodes(nodes)
atomic.AddInt64(&sm.stats.totalShards, int64(numShards))
utils.PrintSuccess("Создано %d range-based шардов", numShards)
return nil
}
// createHashBasedShards создаёт шарды на основе хэша
func (sm *ShardManager) createHashBasedShards(numShards int, nodes []string) error {
for i := 0; i < numShards; i++ {
shardID := fmt.Sprintf("hash-shard-%d", i+1)
shard := &Shard{
ID: shardID,
Nodes: make([]string, 0),
CreatedAt: time.Now(),
}
sm.shards[shardID] = shard
}
// Распределяем шарды по узлам
sm.distributeShardsToNodes(nodes)
atomic.AddInt64(&sm.stats.totalShards, int64(numShards))
utils.PrintSuccess("Создано %d hash-based шардов", numShards)
return nil
}
// distributeShardsToNodes распределяет шарды по узлам
func (sm *ShardManager) distributeShardsToNodes(nodes []string) {
if len(nodes) == 0 {
return
}
nodeCount := len(nodes)
shardCount := len(sm.shards)
// Равномерно распределяем шарды по узлам
shardsPerNode := shardCount / nodeCount
remainder := shardCount % nodeCount
shardIndex := 0
shardIDs := make([]string, 0, shardCount)
for id := range sm.shards {
shardIDs = append(shardIDs, id)
}
for i, nodeID := range nodes {
numShardsForNode := shardsPerNode
if i < remainder {
numShardsForNode++
}
nodeShards := make([]string, 0)
for j := 0; j < numShardsForNode && shardIndex < len(shardIDs); j++ {
shardID := shardIDs[shardIndex]
// Добавляем узел к шарду
sm.addNodeToShard(shardID, nodeID)
nodeShards = append(nodeShards, shardID)
shardIndex++
}
sm.nodeToShards[nodeID] = nodeShards
}
}
// addNodeToShard добавляет узел к шарду
func (sm *ShardManager) addNodeToShard(shardID, nodeID string) {
shard, exists := sm.shards[shardID]
if !exists {
return
}
// Добавляем узел к шарду
shard.Nodes = append(shard.Nodes, nodeID)
// Обновляем маппинг
sm.shardToNodes[shardID] = append(sm.shardToNodes[shardID], nodeID)
// Обновляем маппинг узла к шардам
sm.nodeToShards[nodeID] = append(sm.nodeToShards[nodeID], shardID)
}
// GetShardForKey определяет шард для ключа
func (sm *ShardManager) GetShardForKey(key string) (*Shard, error) {
if !sm.enabled {
return nil, nil
}
switch sm.strategy {
case ConsistentHashing:
return sm.getShardConsistentHashing(key)
case RangeBased:
return sm.getShardRangeBased(key)
case HashBased:
return sm.getShardHashBased(key)
}
return nil, errors.New("неизвестная стратегия шардинга")
}
// getShardConsistentHashing получает шард через consistent hashing
func (sm *ShardManager) getShardConsistentHashing(key string) (*Shard, error) {
if len(sm.hashRing) == 0 {
return nil, errors.New("хэш-кольцо пусто")
}
hash := crc32.ChecksumIEEE([]byte(key))
// Бинарный поиск в кольце
idx := sort.Search(len(sm.hashRing), func(i int) bool {
return sm.hashRing[i] >= hash
})
if idx == len(sm.hashRing) {
idx = 0
}
shardID := sm.hashToShard[sm.hashRing[idx]]
shard, exists := sm.shards[shardID]
if !exists {
return nil, errors.New("шард не найден")
}
atomic.AddInt64(&shard.stats.reads, 1)
return shard, nil
}
// getShardRangeBased получает шард по диапазону
func (sm *ShardManager) getShardRangeBased(key string) (*Shard, error) {
// Вычисляем хэш ключа для определения диапазона
hash := md5.Sum([]byte(key))
keyHex := hex.EncodeToString(hash[:1]) // Используем первый байт
for _, shard := range sm.shards {
if shard.Range != nil {
if keyHex >= shard.Range.Start && keyHex <= shard.Range.End {
atomic.AddInt64(&shard.stats.reads, 1)
return shard, nil
}
}
}
return nil, errors.New("шард не найден для ключа")
}
// getShardHashBased получает шард по хэшу
func (sm *ShardManager) getShardHashBased(key string) (*Shard, error) {
hash := crc32.ChecksumIEEE([]byte(key))
shardIndex := int(hash) % len(sm.shards)
// Получаем отсортированный список ID шардов
shardIDs := make([]string, 0, len(sm.shards))
for id := range sm.shards {
shardIDs = append(shardIDs, id)
}
sort.Strings(shardIDs)
if shardIndex >= 0 && shardIndex < len(shardIDs) {
shardID := shardIDs[shardIndex]
shard, exists := sm.shards[shardID]
if exists {
atomic.AddInt64(&shard.stats.reads, 1)
return shard, nil
}
}
return nil, errors.New("шард не найден")
}
// GetShardByID возвращает шард по ID
func (sm *ShardManager) GetShardByID(shardID string) (*Shard, bool) {
shard, exists := sm.shards[shardID]
return shard, exists
}
// RecordWrite записывает статистику записи в шард
func (sm *ShardManager) RecordWrite(shardID string) {
if !sm.enabled {
return
}
shard, exists := sm.shards[shardID]
if exists {
atomic.AddInt64(&shard.stats.writes, 1)
}
}
// Rebalance выполняет ребалансировку шардов
func (sm *ShardManager) Rebalance() error {
if !sm.enabled {
return nil
}
utils.PrintInfo("Запуск ребалансировки шардов...")
atomic.AddInt64(&sm.stats.rebalances, 1)
// Здесь должна быть логика ребалансировки
// Перемещение шардов между узлами для равномерной загрузки
for _, shard := range sm.shards {
atomic.AddInt64(&shard.stats.rebalances, 1)
}
atomic.AddInt64(&sm.stats.totalMoves, int64(len(sm.shards)/2))
utils.PrintSuccess("Ребалансировка завершена")
return nil
}
// GetShardStats возвращает статистику шардов
func (sm *ShardManager) GetShardStats() map[string]interface{} {
stats := make(map[string]interface{})
stats["enabled"] = sm.enabled
stats["strategy"] = sm.getStrategyName()
stats["total_shards"] = atomic.LoadInt64(&sm.stats.totalShards)
stats["total_rebalances"] = atomic.LoadInt64(&sm.stats.rebalances)
stats["total_moves"] = atomic.LoadInt64(&sm.stats.totalMoves)
shardStats := make([]map[string]interface{}, 0)
for id, shard := range sm.shards {
sStats := map[string]interface{}{
"id": id,
"nodes": shard.Nodes,
"reads": atomic.LoadInt64(&shard.stats.reads),
"writes": atomic.LoadInt64(&shard.stats.writes),
"rebalances": atomic.LoadInt64(&shard.stats.rebalances),
"created": shard.CreatedAt.Format(time.RFC3339),
}
if shard.Range != nil {
sStats["range_start"] = shard.Range.Start
sStats["range_end"] = shard.Range.End
}
shardStats = append(shardStats, sStats)
}
stats["shards"] = shardStats
return stats
}