futriis/internal/storage/compression.go
2026-02-27 22:04:04 +03:00

241 lines
6.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// /futriis/internal/storage/compression.go
// Пакет storage реализует простейшее сжатие для колонок с одинаковыми типами данных
package storage
import (
"encoding/binary"
"math"
"strconv"
"sync/atomic"
)
// CompressionType тип сжатия
type CompressionType int
const (
NoCompression CompressionType = iota
RLECompression // Run-length encoding для повторяющихся значений
DeltaCompression // Дельта-сжатие для чисел
DictionaryCompression // Словарное сжатие для строк
)
// ColumnCompressor предоставляет сжатие для колонки
type ColumnCompressor struct {
colType string
compType CompressionType
stats struct {
originalSize int64
compressedSize int64
savings int64
}
}
// NewColumnCompressor создаёт новый компрессор для колонки
func NewColumnCompressor(colType string) *ColumnCompressor {
cc := &ColumnCompressor{
colType: colType,
compType: NoCompression,
}
// Автоматически выбираем тип сжатия на основе типа данных
switch colType {
case "int", "int64", "float64":
cc.compType = DeltaCompression
case "string":
cc.compType = DictionaryCompression
default:
cc.compType = RLECompression
}
return cc
}
// Compress сжимает данные
func (cc *ColumnCompressor) Compress(data []interface{}) ([]byte, error) {
var compressed []byte
var err error
switch cc.compType {
case RLECompression:
compressed, err = cc.rleCompress(data)
case DeltaCompression:
compressed, err = cc.deltaCompress(data)
case DictionaryCompression:
compressed, err = cc.dictionaryCompress(data)
default:
// Без сжатия - просто сериализуем
compressed, err = cc.noCompress(data)
}
if err != nil {
return nil, err
}
// Обновляем статистику
originalSize := int64(len(data) * 8) // Примерная оценка
compressedSize := int64(len(compressed))
atomic.AddInt64(&cc.stats.originalSize, originalSize)
atomic.AddInt64(&cc.stats.compressedSize, compressedSize)
atomic.AddInt64(&cc.stats.savings, originalSize-compressedSize)
return compressed, nil
}
// rleCompress реализует сжатие повторяющихся значений
func (cc *ColumnCompressor) rleCompress(data []interface{}) ([]byte, error) {
if len(data) == 0 {
return []byte{}, nil
}
result := make([]byte, 0)
current := data[0]
count := 1
for i := 1; i < len(data); i++ {
if data[i] == current {
count++
} else {
// Записываем значение и счётчик
result = append(result, []byte(encodeValue(current))...)
result = append(result, byte(count))
current = data[i]
count = 1
}
}
// Записываем последнее значение
result = append(result, []byte(encodeValue(current))...)
result = append(result, byte(count))
return result, nil
}
// deltaCompress реализует дельта-сжатие для чисел
func (cc *ColumnCompressor) deltaCompress(data []interface{}) ([]byte, error) {
if len(data) == 0 {
return []byte{}, nil
}
result := make([]byte, 8) // Первое значение храним полностью
// Преобразуем первое значение
first, ok := data[0].(float64)
if !ok {
if i, ok := data[0].(int); ok {
first = float64(i)
} else {
return cc.noCompress(data)
}
}
binary.LittleEndian.PutUint64(result, math.Float64bits(first))
// Для остальных храним дельты
for i := 1; i < len(data); i++ {
var curr float64
switch v := data[i].(type) {
case float64:
curr = v
case int:
curr = float64(v)
default:
return cc.noCompress(data)
}
prev, _ := data[i-1].(float64)
if iPrev, ok := data[i-1].(int); ok {
prev = float64(iPrev)
}
delta := int16(curr - prev)
deltaBytes := make([]byte, 2)
binary.LittleEndian.PutUint16(deltaBytes, uint16(delta))
result = append(result, deltaBytes...)
}
return result, nil
}
// dictionaryCompress реализует словарное сжатие для строк
func (cc *ColumnCompressor) dictionaryCompress(data []interface{}) ([]byte, error) {
// Строим словарь уникальных значений
dict := make(map[string]byte)
values := make([]byte, len(data))
nextCode := byte(0)
for i, val := range data {
str, ok := val.(string)
if !ok {
return cc.noCompress(data)
}
code, exists := dict[str]
if !exists {
code = nextCode
dict[str] = code
nextCode++
}
values[i] = code
}
// Кодируем: сначала словарь, затем значения
result := make([]byte, 0)
// Записываем размер словаря
result = append(result, byte(len(dict)))
// Записываем словарь
for str, code := range dict {
result = append(result, code)
result = append(result, byte(len(str)))
result = append(result, []byte(str)...)
}
// Записываем значения
result = append(result, values...)
return result, nil
}
// noCompress без сжатия
func (cc *ColumnCompressor) noCompress(data []interface{}) ([]byte, error) {
result := make([]byte, 0)
for _, val := range data {
result = append(result, []byte(encodeValue(val))...)
}
return result, nil
}
// encodeValue кодирует значение в строку
func encodeValue(val interface{}) string {
switch v := val.(type) {
case string:
return v
case int:
return strconv.Itoa(v)
case int64:
return strconv.FormatInt(v, 10)
case float64:
return strconv.FormatFloat(v, 'f', -1, 64)
case bool:
return strconv.FormatBool(v)
default:
return ""
}
}
// GetStats возвращает статистику сжатия
func (cc *ColumnCompressor) GetStats() map[string]int64 {
return map[string]int64{
"original_size": atomic.LoadInt64(&cc.stats.originalSize),
"compressed_size": atomic.LoadInt64(&cc.stats.compressedSize),
"savings": atomic.LoadInt64(&cc.stats.savings),
}
}