449 lines
12 KiB
Go
449 lines
12 KiB
Go
// /futriis/internal/cluster/sharding.go
|
||
// Пакет cluster реализует шардинг данных для распределённого хранения
|
||
// Данный файл содержит реализацию менеджера шардинга, который управляет
|
||
// распределением данных по шардам с поддержкой различных стратегий (consistent hashing, range-based, hash-based) и обеспечивает ребалансировку)
|
||
|
||
package cluster
|
||
|
||
import (
|
||
"crypto/md5"
|
||
"encoding/hex"
|
||
"errors"
|
||
"fmt"
|
||
"hash/crc32"
|
||
"sort"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"futriis/pkg/utils"
|
||
)
|
||
|
||
// ShardingStrategy стратегия шардинга
|
||
type ShardingStrategy int
|
||
|
||
const (
|
||
ConsistentHashing ShardingStrategy = iota
|
||
RangeBased
|
||
HashBased
|
||
)
|
||
|
||
// Shard представляет шард данных
|
||
type Shard struct {
|
||
ID string
|
||
Nodes []string // ID узлов, содержащих этот шард
|
||
Range *KeyRange
|
||
DataSize int64
|
||
CreatedAt time.Time
|
||
stats struct {
|
||
reads int64
|
||
writes int64
|
||
rebalances int64
|
||
}
|
||
}
|
||
|
||
// KeyRange диапазон ключей для range-based шардинга
|
||
type KeyRange struct {
|
||
Start string
|
||
End string
|
||
}
|
||
|
||
// ShardManager управляет шардированием
|
||
type ShardManager struct {
|
||
strategy ShardingStrategy
|
||
shards map[string]*Shard
|
||
virtualNodes int // Количество виртуальных нод для consistent hashing
|
||
hashRing []uint32 // Хэш-кольцо
|
||
hashToShard map[uint32]string // Хэш -> ID шарда
|
||
nodeToShards map[string][]string // Узел -> список шардов
|
||
shardToNodes map[string][]string // Шард -> список узлов
|
||
replicationFactor int
|
||
enabled bool
|
||
stats struct {
|
||
totalShards int64
|
||
totalMoves int64
|
||
rebalances int64
|
||
}
|
||
}
|
||
|
||
// NewShardManager создаёт новый менеджер шардинга
|
||
func NewShardManager(strategy ShardingStrategy, replicationFactor int, enabled bool) *ShardManager {
|
||
sm := &ShardManager{
|
||
strategy: strategy,
|
||
shards: make(map[string]*Shard),
|
||
hashRing: make([]uint32, 0),
|
||
hashToShard: make(map[uint32]string),
|
||
nodeToShards: make(map[string][]string),
|
||
shardToNodes: make(map[string][]string),
|
||
virtualNodes: 100, // По умолчанию 100 виртуальных нод
|
||
replicationFactor: replicationFactor,
|
||
enabled: enabled,
|
||
}
|
||
|
||
if enabled {
|
||
utils.PrintInfo("Шардинг активирован, стратегия: %v, фактор репликации: %d",
|
||
sm.getStrategyName(), replicationFactor)
|
||
}
|
||
|
||
return sm
|
||
}
|
||
|
||
// getStrategyName возвращает название стратегии
|
||
func (sm *ShardManager) getStrategyName() string {
|
||
switch sm.strategy {
|
||
case ConsistentHashing:
|
||
return "consistent_hashing"
|
||
case RangeBased:
|
||
return "range_based"
|
||
case HashBased:
|
||
return "hash_based"
|
||
default:
|
||
return "unknown"
|
||
}
|
||
}
|
||
|
||
// CreateShards создаёт начальные шарды
|
||
func (sm *ShardManager) CreateShards(numShards int, nodes []string) error {
|
||
if !sm.enabled {
|
||
return nil
|
||
}
|
||
|
||
switch sm.strategy {
|
||
case ConsistentHashing:
|
||
return sm.createConsistentHashShards(numShards, nodes)
|
||
case RangeBased:
|
||
return sm.createRangeBasedShards(numShards, nodes)
|
||
case HashBased:
|
||
return sm.createHashBasedShards(numShards, nodes)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// createConsistentHashShards создаёт шарды для consistent hashing
|
||
func (sm *ShardManager) createConsistentHashShards(numShards int, nodes []string) error {
|
||
// Очищаем кольцо
|
||
sm.hashRing = make([]uint32, 0)
|
||
sm.hashToShard = make(map[uint32]string)
|
||
|
||
// Создаём шарды
|
||
for i := 0; i < numShards; i++ {
|
||
shardID := fmt.Sprintf("shard-%d", i+1)
|
||
|
||
shard := &Shard{
|
||
ID: shardID,
|
||
Nodes: make([]string, 0),
|
||
CreatedAt: time.Now(),
|
||
}
|
||
|
||
sm.shards[shardID] = shard
|
||
|
||
// Добавляем виртуальные ноды для шарда в хэш-кольцо
|
||
for j := 0; j < sm.virtualNodes; j++ {
|
||
vnodeKey := fmt.Sprintf("%s:%d", shardID, j)
|
||
hash := crc32.ChecksumIEEE([]byte(vnodeKey))
|
||
sm.hashRing = append(sm.hashRing, hash)
|
||
sm.hashToShard[hash] = shardID
|
||
}
|
||
}
|
||
|
||
// Сортируем кольцо
|
||
sort.Slice(sm.hashRing, func(i, j int) bool {
|
||
return sm.hashRing[i] < sm.hashRing[j]
|
||
})
|
||
|
||
// Распределяем шарды по узлам
|
||
sm.distributeShardsToNodes(nodes)
|
||
|
||
atomic.AddInt64(&sm.stats.totalShards, int64(numShards))
|
||
|
||
utils.PrintSuccess("Создано %d шардов с consistent hashing", numShards)
|
||
|
||
return nil
|
||
}
|
||
|
||
// createRangeBasedShards создаёт шарды на основе диапазонов
|
||
func (sm *ShardManager) createRangeBasedShards(numShards int, nodes []string) error {
|
||
// Разбиваем ключевое пространство на диапазоны
|
||
// Используем hex-строки от "00" до "ff" для простоты
|
||
|
||
totalRange := 256 // 0x00 - 0xff
|
||
rangeSize := totalRange / numShards
|
||
|
||
for i := 0; i < numShards; i++ {
|
||
start := fmt.Sprintf("%02x", i*rangeSize)
|
||
end := fmt.Sprintf("%02x", (i+1)*rangeSize-1)
|
||
|
||
if i == numShards-1 {
|
||
end = "ff"
|
||
}
|
||
|
||
shardID := fmt.Sprintf("range-shard-%d", i+1)
|
||
|
||
shard := &Shard{
|
||
ID: shardID,
|
||
Range: &KeyRange{
|
||
Start: start,
|
||
End: end,
|
||
},
|
||
Nodes: make([]string, 0),
|
||
CreatedAt: time.Now(),
|
||
}
|
||
|
||
sm.shards[shardID] = shard
|
||
}
|
||
|
||
// Распределяем шарды по узлам
|
||
sm.distributeShardsToNodes(nodes)
|
||
|
||
atomic.AddInt64(&sm.stats.totalShards, int64(numShards))
|
||
|
||
utils.PrintSuccess("Создано %d range-based шардов", numShards)
|
||
|
||
return nil
|
||
}
|
||
|
||
// createHashBasedShards создаёт шарды на основе хэша
|
||
func (sm *ShardManager) createHashBasedShards(numShards int, nodes []string) error {
|
||
for i := 0; i < numShards; i++ {
|
||
shardID := fmt.Sprintf("hash-shard-%d", i+1)
|
||
|
||
shard := &Shard{
|
||
ID: shardID,
|
||
Nodes: make([]string, 0),
|
||
CreatedAt: time.Now(),
|
||
}
|
||
|
||
sm.shards[shardID] = shard
|
||
}
|
||
|
||
// Распределяем шарды по узлам
|
||
sm.distributeShardsToNodes(nodes)
|
||
|
||
atomic.AddInt64(&sm.stats.totalShards, int64(numShards))
|
||
|
||
utils.PrintSuccess("Создано %d hash-based шардов", numShards)
|
||
|
||
return nil
|
||
}
|
||
|
||
// distributeShardsToNodes распределяет шарды по узлам
|
||
func (sm *ShardManager) distributeShardsToNodes(nodes []string) {
|
||
if len(nodes) == 0 {
|
||
return
|
||
}
|
||
|
||
nodeCount := len(nodes)
|
||
shardCount := len(sm.shards)
|
||
|
||
// Равномерно распределяем шарды по узлам
|
||
shardsPerNode := shardCount / nodeCount
|
||
remainder := shardCount % nodeCount
|
||
|
||
shardIndex := 0
|
||
shardIDs := make([]string, 0, shardCount)
|
||
for id := range sm.shards {
|
||
shardIDs = append(shardIDs, id)
|
||
}
|
||
|
||
for i, nodeID := range nodes {
|
||
numShardsForNode := shardsPerNode
|
||
if i < remainder {
|
||
numShardsForNode++
|
||
}
|
||
|
||
nodeShards := make([]string, 0)
|
||
|
||
for j := 0; j < numShardsForNode && shardIndex < len(shardIDs); j++ {
|
||
shardID := shardIDs[shardIndex]
|
||
// Добавляем узел к шарду
|
||
sm.addNodeToShard(shardID, nodeID)
|
||
nodeShards = append(nodeShards, shardID)
|
||
shardIndex++
|
||
}
|
||
|
||
sm.nodeToShards[nodeID] = nodeShards
|
||
}
|
||
}
|
||
|
||
// addNodeToShard добавляет узел к шарду
|
||
func (sm *ShardManager) addNodeToShard(shardID, nodeID string) {
|
||
shard, exists := sm.shards[shardID]
|
||
if !exists {
|
||
return
|
||
}
|
||
|
||
// Добавляем узел к шарду
|
||
shard.Nodes = append(shard.Nodes, nodeID)
|
||
|
||
// Обновляем маппинг
|
||
sm.shardToNodes[shardID] = append(sm.shardToNodes[shardID], nodeID)
|
||
|
||
// Обновляем маппинг узла к шардам
|
||
sm.nodeToShards[nodeID] = append(sm.nodeToShards[nodeID], shardID)
|
||
}
|
||
|
||
// GetShardForKey определяет шард для ключа
|
||
func (sm *ShardManager) GetShardForKey(key string) (*Shard, error) {
|
||
if !sm.enabled {
|
||
return nil, nil
|
||
}
|
||
|
||
switch sm.strategy {
|
||
case ConsistentHashing:
|
||
return sm.getShardConsistentHashing(key)
|
||
case RangeBased:
|
||
return sm.getShardRangeBased(key)
|
||
case HashBased:
|
||
return sm.getShardHashBased(key)
|
||
}
|
||
|
||
return nil, errors.New("неизвестная стратегия шардинга")
|
||
}
|
||
|
||
// getShardConsistentHashing получает шард через consistent hashing
|
||
func (sm *ShardManager) getShardConsistentHashing(key string) (*Shard, error) {
|
||
if len(sm.hashRing) == 0 {
|
||
return nil, errors.New("хэш-кольцо пусто")
|
||
}
|
||
|
||
hash := crc32.ChecksumIEEE([]byte(key))
|
||
|
||
// Бинарный поиск в кольце
|
||
idx := sort.Search(len(sm.hashRing), func(i int) bool {
|
||
return sm.hashRing[i] >= hash
|
||
})
|
||
|
||
if idx == len(sm.hashRing) {
|
||
idx = 0
|
||
}
|
||
|
||
shardID := sm.hashToShard[sm.hashRing[idx]]
|
||
shard, exists := sm.shards[shardID]
|
||
if !exists {
|
||
return nil, errors.New("шард не найден")
|
||
}
|
||
|
||
atomic.AddInt64(&shard.stats.reads, 1)
|
||
|
||
return shard, nil
|
||
}
|
||
|
||
// getShardRangeBased получает шард по диапазону
|
||
func (sm *ShardManager) getShardRangeBased(key string) (*Shard, error) {
|
||
// Вычисляем хэш ключа для определения диапазона
|
||
hash := md5.Sum([]byte(key))
|
||
keyHex := hex.EncodeToString(hash[:1]) // Используем первый байт
|
||
|
||
for _, shard := range sm.shards {
|
||
if shard.Range != nil {
|
||
if keyHex >= shard.Range.Start && keyHex <= shard.Range.End {
|
||
atomic.AddInt64(&shard.stats.reads, 1)
|
||
return shard, nil
|
||
}
|
||
}
|
||
}
|
||
|
||
return nil, errors.New("шард не найден для ключа")
|
||
}
|
||
|
||
// getShardHashBased получает шард по хэшу
|
||
func (sm *ShardManager) getShardHashBased(key string) (*Shard, error) {
|
||
hash := crc32.ChecksumIEEE([]byte(key))
|
||
shardIndex := int(hash) % len(sm.shards)
|
||
|
||
// Получаем отсортированный список ID шардов
|
||
shardIDs := make([]string, 0, len(sm.shards))
|
||
for id := range sm.shards {
|
||
shardIDs = append(shardIDs, id)
|
||
}
|
||
sort.Strings(shardIDs)
|
||
|
||
if shardIndex >= 0 && shardIndex < len(shardIDs) {
|
||
shardID := shardIDs[shardIndex]
|
||
shard, exists := sm.shards[shardID]
|
||
if exists {
|
||
atomic.AddInt64(&shard.stats.reads, 1)
|
||
return shard, nil
|
||
}
|
||
}
|
||
|
||
return nil, errors.New("шард не найден")
|
||
}
|
||
|
||
// GetShardByID возвращает шард по ID
|
||
func (sm *ShardManager) GetShardByID(shardID string) (*Shard, bool) {
|
||
shard, exists := sm.shards[shardID]
|
||
return shard, exists
|
||
}
|
||
|
||
// RecordWrite записывает статистику записи в шард
|
||
func (sm *ShardManager) RecordWrite(shardID string) {
|
||
if !sm.enabled {
|
||
return
|
||
}
|
||
|
||
shard, exists := sm.shards[shardID]
|
||
if exists {
|
||
atomic.AddInt64(&shard.stats.writes, 1)
|
||
}
|
||
}
|
||
|
||
// Rebalance выполняет ребалансировку шардов
|
||
func (sm *ShardManager) Rebalance() error {
|
||
if !sm.enabled {
|
||
return nil
|
||
}
|
||
|
||
utils.PrintInfo("Запуск ребалансировки шардов...")
|
||
|
||
atomic.AddInt64(&sm.stats.rebalances, 1)
|
||
|
||
// Здесь должна быть логика ребалансировки
|
||
// Перемещение шардов между узлами для равномерной загрузки
|
||
|
||
for _, shard := range sm.shards {
|
||
atomic.AddInt64(&shard.stats.rebalances, 1)
|
||
}
|
||
|
||
atomic.AddInt64(&sm.stats.totalMoves, int64(len(sm.shards)/2))
|
||
|
||
utils.PrintSuccess("Ребалансировка завершена")
|
||
|
||
return nil
|
||
}
|
||
|
||
// GetShardStats возвращает статистику шардов
|
||
func (sm *ShardManager) GetShardStats() map[string]interface{} {
|
||
stats := make(map[string]interface{})
|
||
|
||
stats["enabled"] = sm.enabled
|
||
stats["strategy"] = sm.getStrategyName()
|
||
stats["total_shards"] = atomic.LoadInt64(&sm.stats.totalShards)
|
||
stats["total_rebalances"] = atomic.LoadInt64(&sm.stats.rebalances)
|
||
stats["total_moves"] = atomic.LoadInt64(&sm.stats.totalMoves)
|
||
|
||
shardStats := make([]map[string]interface{}, 0)
|
||
|
||
for id, shard := range sm.shards {
|
||
sStats := map[string]interface{}{
|
||
"id": id,
|
||
"nodes": shard.Nodes,
|
||
"reads": atomic.LoadInt64(&shard.stats.reads),
|
||
"writes": atomic.LoadInt64(&shard.stats.writes),
|
||
"rebalances": atomic.LoadInt64(&shard.stats.rebalances),
|
||
"created": shard.CreatedAt.Format(time.RFC3339),
|
||
}
|
||
|
||
if shard.Range != nil {
|
||
sStats["range_start"] = shard.Range.Start
|
||
sStats["range_end"] = shard.Range.End
|
||
}
|
||
|
||
shardStats = append(shardStats, sStats)
|
||
}
|
||
|
||
stats["shards"] = shardStats
|
||
|
||
return stats
|
||
}
|