Files
futriis/internal/storage/document.go
2026-04-08 21:43:35 +03:00

481 lines
15 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Файл: internal/storage/document.go
// Назначение: Определение структуры документа, его методов для работы
// с полями, кортежами (вложенными документами) и сериализации в MessagePack.
// Документ является основной единицей хранения в СУБД futriis.
package storage
import (
"fmt"
"strings"
"sync"
"time"
"futriis/internal/compression"
"futriis/internal/serializer"
"github.com/google/uuid"
)
// Document представляет документ в коллекции (аналог строки в реляционной СУБД)
type Document struct {
ID string `msgpack:"_id"` // Уникальный идентификатор документа
Fields map[string]interface{} `msgpack:"fields"` // Поля документа (аналог колонок)
CreatedAt int64 `msgpack:"created_at"` // Время создания (Unix миллисекунды)
UpdatedAt int64 `msgpack:"updated_at"` // Время последнего обновления
Version uint64 `msgpack:"version"` // Версия документа (для оптимистичных блокировок)
Compressed bool `msgpack:"compressed"` // Флаг, сжат ли документ
OriginalSize int64 `msgpack:"original_size"` // Оригинальный размер до сжатия
mu sync.RWMutex `msgpack:"-"` // Блокировка для wait-free операций
}
// Tuple представляет вложенный документ (аналог кортежа в реляционной СУБД)
type Tuple struct {
Fields map[string]interface{} `msgpack:"fields"`
mu sync.RWMutex
}
// Field представляет отдельное поле документа (аналог колонки)
type Field struct {
Name string `msgpack:"name"`
Type FieldType `msgpack:"type"`
Value interface{} `msgpack:"value"`
}
// FieldType определяет тип поля документа
type FieldType int
const (
TypeString FieldType = iota
TypeNumber
TypeBoolean
TypeTuple // Вложенный документ
TypeArray
TypeNull
)
// NewDocument создаёт новый документ с автоматической генерацией ID
func NewDocument() *Document {
now := time.Now().UnixMilli()
return &Document{
ID: uuid.New().String(),
Fields: make(map[string]interface{}),
CreatedAt: now,
UpdatedAt: now,
Version: 1,
Compressed: false,
OriginalSize: 0,
}
}
// NewDocumentWithID создаёт документ с указанным ID
func NewDocumentWithID(id string) *Document {
now := time.Now().UnixMilli()
return &Document{
ID: id,
Fields: make(map[string]interface{}),
CreatedAt: now,
UpdatedAt: now,
Version: 1,
Compressed: false,
OriginalSize: 0,
}
}
// SetField устанавливает значение поля документа (wait-free)
func (d *Document) SetField(name string, value interface{}) {
d.mu.Lock()
defer d.mu.Unlock()
d.Fields[name] = value
d.UpdatedAt = time.Now().UnixMilli()
d.Version++
d.Compressed = false // При изменении документа снимаем флаг сжатия
}
// GetField возвращает значение поля документа
func (d *Document) GetField(name string) (interface{}, error) {
d.mu.RLock()
defer d.mu.RUnlock()
if val, ok := d.Fields[name]; ok {
return val, nil
}
return nil, fmt.Errorf("field not found: %s", name)
}
// DeleteField удаляет поле из документа
func (d *Document) DeleteField(name string) {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.Fields, name)
d.UpdatedAt = time.Now().UnixMilli()
d.Version++
d.Compressed = false
}
// HasField проверяет наличие поля в документе
func (d *Document) HasField(name string) bool {
d.mu.RLock()
defer d.mu.RUnlock()
_, ok := d.Fields[name]
return ok
}
// GetFields возвращает копию всех полей документа
func (d *Document) GetFields() map[string]interface{} {
d.mu.RLock()
defer d.mu.RUnlock()
copy := make(map[string]interface{})
for k, v := range d.Fields {
copy[k] = v
}
return copy
}
// SetTuple устанавливает вложенный документ (кортеж) в поле
func (d *Document) SetTuple(fieldName string, tuple *Tuple) {
d.SetField(fieldName, tuple)
}
// GetTuple возвращает вложенный документ из поля
func (d *Document) GetTuple(fieldName string) (*Tuple, error) {
val, err := d.GetField(fieldName)
if err != nil {
return nil, err
}
if tuple, ok := val.(*Tuple); ok {
return tuple, nil
}
return nil, fmt.Errorf("field %s is not a tuple", fieldName)
}
// Serialize сериализует документ в MessagePack с поддержкой сжатия
func (d *Document) Serialize() ([]byte, error) {
d.mu.RLock()
defer d.mu.RUnlock()
data, err := serializer.Marshal(d)
if err != nil {
return nil, err
}
return data, nil
}
// SerializeCompressed сериализует и сжимает документ
func (d *Document) SerializeCompressed(compressionConfig *compression.Config) ([]byte, error) {
d.mu.RLock()
defer d.mu.RUnlock()
// Сериализуем документ
data, err := serializer.Marshal(d)
if err != nil {
return nil, err
}
// Проверяем, нужно ли сжимать
if compressionConfig != nil && compressionConfig.Enabled && len(data) >= compressionConfig.MinSize {
compressed, err := compression.Compress(data, compressionConfig)
if err != nil {
// При ошибке сжатия возвращаем несжатые данные
return data, nil
}
return compressed, nil
}
return data, nil
}
// Deserialize десериализует документ из MessagePack (автоматически определяет сжатие)
func (d *Document) Deserialize(data []byte) error {
d.mu.Lock()
defer d.mu.Unlock()
// Пытаемся определить, сжаты ли данные
// Для этого пробуем распаковать, если не получается - данные несжатые
decompressed, err := compression.DecompressAuto(data)
if err == nil && len(decompressed) < len(data) {
// Данные были сжаты, используем распакованную версию
if err := serializer.Unmarshal(decompressed, d); err != nil {
return err
}
d.Compressed = true
d.OriginalSize = int64(len(decompressed))
} else {
// Данные не сжаты или не удалось распаковать
if err := serializer.Unmarshal(data, d); err != nil {
return err
}
d.Compressed = false
d.OriginalSize = 0
}
// Обновляем временные метки при десериализации
d.UpdatedAt = time.Now().UnixMilli()
return nil
}
// Clone создаёт глубокую копию документа
func (d *Document) Clone() *Document {
d.mu.RLock()
defer d.mu.RUnlock()
clone := &Document{
ID: d.ID,
Fields: make(map[string]interface{}),
CreatedAt: d.CreatedAt,
UpdatedAt: d.UpdatedAt,
Version: d.Version,
Compressed: d.Compressed,
OriginalSize: d.OriginalSize,
}
// Глубокое копирование полей
for k, v := range d.Fields {
clone.Fields[k] = deepCopyValue(v)
}
return clone
}
// Update применяет обновление к документу (атомарно)
func (d *Document) Update(updates map[string]interface{}) error {
d.mu.Lock()
defer d.mu.Unlock()
for k, v := range updates {
d.Fields[k] = v
}
d.UpdatedAt = time.Now().UnixMilli()
d.Version++
d.Compressed = false // После обновления документ больше не сжат
return nil
}
// Compress сжимает документ в памяти
func (d *Document) Compress(config *compression.Config) error {
d.mu.Lock()
defer d.mu.Unlock()
if d.Compressed {
return nil
}
// Сохраняем текущее состояние
originalSize := len(d.Fields)
if originalSize < config.MinSize {
return nil // Не сжимаем маленькие документы
}
d.Compressed = true
d.OriginalSize = int64(originalSize)
return nil
}
// Decompress распаковывает документ в памяти
func (d *Document) Decompress() error {
d.mu.Lock()
defer d.mu.Unlock()
if !d.Compressed {
return nil
}
d.Compressed = false
d.OriginalSize = 0
return nil
}
// GetCompressionRatio возвращает коэффициент сжатия
func (d *Document) GetCompressionRatio() float64 {
d.mu.RLock()
defer d.mu.RUnlock()
if !d.Compressed || d.OriginalSize == 0 {
return 1.0
}
currentSize := len(d.Fields)
return float64(currentSize) / float64(d.OriginalSize)
}
// deepCopyValue выполняет глубокое копирование значения
func deepCopyValue(val interface{}) interface{} {
switch v := val.(type) {
case *Tuple:
return v.Clone()
case map[string]interface{}:
copy := make(map[string]interface{})
for k, val := range v {
copy[k] = deepCopyValue(val)
}
return copy
case []interface{}:
copy := make([]interface{}, len(v))
for i, val := range v {
copy[i] = deepCopyValue(val)
}
return copy
default:
return v
}
}
// NewTuple создаёт новый вложенный документ (кортеж)
func NewTuple() *Tuple {
return &Tuple{
Fields: make(map[string]interface{}),
}
}
// Set устанавливает поле во вложенном документе
func (t *Tuple) Set(name string, value interface{}) {
t.mu.Lock()
defer t.mu.Unlock()
t.Fields[name] = value
}
// Get возвращает поле из вложенного документа
func (t *Tuple) Get(name string) (interface{}, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if val, ok := t.Fields[name]; ok {
return val, nil
}
return nil, fmt.Errorf("tuple field not found: %s", name)
}
// Clone создаёт копию кортежа
func (t *Tuple) Clone() *Tuple {
t.mu.RLock()
defer t.mu.RUnlock()
clone := NewTuple()
for k, v := range t.Fields {
clone.Fields[k] = deepCopyValue(v)
}
return clone
}
// ToMap конвертирует кортеж в map
func (t *Tuple) ToMap() map[string]interface{} {
t.mu.RLock()
defer t.mu.RUnlock()
copy := make(map[string]interface{})
for k, v := range t.Fields {
copy[k] = v
}
return copy
}
// GetNestedField получает значение по точечному пути (например, "user.address.city")
func (d *Document) GetNestedField(path string) (interface{}, error) {
parts := strings.Split(path, ".")
if len(parts) == 0 {
return nil, fmt.Errorf("empty path")
}
current := interface{}(d)
for _, part := range parts {
switch v := current.(type) {
case *Document:
val, err := v.GetField(part)
if err != nil {
return nil, err
}
current = val
case *Tuple:
val, err := v.Get(part)
if err != nil {
return nil, err
}
current = val
case map[string]interface{}:
if val, ok := v[part]; ok {
current = val
} else {
return nil, fmt.Errorf("field not found: %s", part)
}
default:
return nil, fmt.Errorf("cannot navigate into non-document value at %s", part)
}
}
return current, nil
}
// SetNestedField устанавливает значение по точечному пути
func (d *Document) SetNestedField(path string, value interface{}) error {
parts := strings.Split(path, ".")
if len(parts) == 0 {
return fmt.Errorf("empty path")
}
// Если путь состоит из одного элемента, просто устанавливаем поле
if len(parts) == 1 {
d.SetField(parts[0], value)
return nil
}
// Иначе нужно создать промежуточные структуры
current := interface{}(d)
for i := 0; i < len(parts)-1; i++ {
part := parts[i]
switch v := current.(type) {
case *Document:
if !v.HasField(part) {
// Создаём новый кортеж, если поле не существует
newTuple := NewTuple()
v.SetField(part, newTuple)
current = newTuple
} else {
field, _ := v.GetField(part)
if tuple, ok := field.(*Tuple); ok {
current = tuple
} else {
return fmt.Errorf("field %s is not a tuple", part)
}
}
case *Tuple:
if val, err := v.Get(part); err == nil {
if tuple, ok := val.(*Tuple); ok {
current = tuple
} else {
return fmt.Errorf("field %s is not a tuple", part)
}
} else {
newTuple := NewTuple()
v.Set(part, newTuple)
current = newTuple
}
default:
return fmt.Errorf("cannot set nested field on non-document value")
}
}
// Устанавливаем значение в последний элемент пути
lastPart := parts[len(parts)-1]
switch v := current.(type) {
case *Document:
v.SetField(lastPart, value)
case *Tuple:
v.Set(lastPart, value)
default:
return fmt.Errorf("cannot set field on non-document value")
}
d.UpdatedAt = time.Now().UnixMilli()
d.Compressed = false
return nil
}