289 lines
8.3 KiB
Go
Raw Normal View History

2026-02-27 22:04:04 +03:00
// /futriis/internal/storage/tuple.go
// Пакет storage реализует операции с кортежами (записями) - базовыми единицами данных.
// TupleManager предоставляет wait-free операции создания, чтения, обновления и удаления кортежей
// с использованием атомарных счётчиков для статистики и поддержкой индексов.
package storage
import (
"errors"
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"
"unsafe"
"futriis/pkg/types"
"futriis/pkg/utils"
)
// TupleManager управляет операциями с кортежами
type TupleManager struct {
stats struct {
created int64
updated int64
deleted int64
read int64
}
columnCompressors map[string]*ColumnCompressor // Имя колонки -> компрессор
// Добавляем мьютекс для синхронизации доступа к кортежам
mu sync.RWMutex
}
// NewTupleManager создаёт новый менеджер кортежей
func NewTupleManager() *TupleManager {
return &TupleManager{
columnCompressors: make(map[string]*ColumnCompressor),
}
}
// вспомогательная функция для доступа к неэкспортируемому полю tuples в Slice
func getTuplesFromSlice(slice *types.Slice) map[string]*types.Tuple {
v := reflect.ValueOf(slice).Elem()
field := v.FieldByName("tuples")
if field.IsValid() && field.Kind() == reflect.Map {
result := make(map[string]*types.Tuple)
iter := field.MapRange()
for iter.Next() {
key := iter.Key().String()
value := iter.Value().Interface()
if tuple, ok := value.(*types.Tuple); ok {
result[key] = tuple
}
}
return result
}
return make(map[string]*types.Tuple)
}
// вспомогательная функция для добавления кортежа в неэкспортируемое поле tuples
func addTupleToSlice(slice *types.Slice, id string, tuple *types.Tuple) error {
v := reflect.ValueOf(slice).Elem()
field := v.FieldByName("tuples")
if field.IsValid() && field.Kind() == reflect.Map {
if field.IsNil() {
newMap := reflect.MakeMap(reflect.TypeOf(map[string]*types.Tuple{}))
field.Set(newMap)
}
key := reflect.ValueOf(id)
value := reflect.ValueOf(tuple)
field.SetMapIndex(key, value)
return nil
}
return errors.New("cannot access tuples field in slice")
}
// вспомогательная функция для удаления кортежа из неэкспортируемого поля tuples
func removeTupleFromSlice(slice *types.Slice, id string) error {
v := reflect.ValueOf(slice).Elem()
field := v.FieldByName("tuples")
if field.IsValid() && field.Kind() == reflect.Map {
key := reflect.ValueOf(id)
field.SetMapIndex(key, reflect.Value{})
return nil
}
return errors.New("cannot access tuples field in slice")
}
// вспомогательная функция для проверки существования кортежа
func tupleExistsInSlice(slice *types.Slice, id string) bool {
tuples := getTuplesFromSlice(slice)
_, exists := tuples[id]
return exists
}
// CreateTuple создаёт новый кортеж в указанном слайсе с временной меткой
// Wait-free операция: использует атомарные операции для счётчиков
func (tm *TupleManager) CreateTuple(slice *types.Slice, id string, fields map[string]interface{}) (*types.Tuple, error) {
if slice == nil {
return nil, errors.New("slice is nil")
}
tm.mu.Lock()
defer tm.mu.Unlock()
// Проверяем существование кортежа
if tupleExistsInSlice(slice, id) {
return nil, errors.New("tuple already exists")
}
// Создаём новый кортеж с временной меткой
tuple := types.NewTuple(id)
tuple.CreatedAt = time.Now()
tuple.UpdatedAt = time.Now()
for k, v := range fields {
tuple.Fields[k] = v
// Создаём компрессор для колонки, если его нет
if _, ok := tm.columnCompressors[k]; !ok {
colType := getFieldType(v)
tm.columnCompressors[k] = NewColumnCompressor(colType)
}
}
// Добавляем в слайс
err := addTupleToSlice(slice, id, tuple)
if err != nil {
return nil, err
}
// Обновляем индексы, если они есть
// Получаем доступ к индексам через таппл
// В реальном коде здесь должна быть ссылка на IndexManager
atomic.AddInt64(&tm.stats.created, 1)
logger := utils.GetLogger()
if logger != nil {
logger.Log("INFO", fmt.Sprintf("Created tuple: %s at %s", id, tuple.CreatedAt.Format(time.RFC3339)))
}
return tuple, nil
}
// ReadTuple читает кортеж по ID
// Wait-free операция для чтения
func (tm *TupleManager) ReadTuple(slice *types.Slice, id string) (*types.Tuple, error) {
if slice == nil {
return nil, errors.New("slice is nil")
}
tm.mu.RLock()
defer tm.mu.RUnlock()
tuples := getTuplesFromSlice(slice)
tuple, exists := tuples[id]
if !exists {
return nil, errors.New("tuple not found")
}
atomic.AddInt64(&tm.stats.read, 1)
return tuple, nil
}
// UpdateTuple обновляет поля кортежа с обновлением временной метки
func (tm *TupleManager) UpdateTuple(slice *types.Slice, id string, fields map[string]interface{}) (*types.Tuple, error) {
if slice == nil {
return nil, errors.New("slice is nil")
}
tm.mu.Lock()
defer tm.mu.Unlock()
tuples := getTuplesFromSlice(slice)
tuple, exists := tuples[id]
if !exists {
return nil, errors.New("tuple not found")
}
// Обновляем поля
for k, v := range fields {
tuple.Fields[k] = v
}
// Обновляем временную метку
tuple.UpdatedAt = time.Now()
atomic.AddInt64(&tm.stats.updated, 1)
logger := utils.GetLogger()
if logger != nil {
logger.Log("INFO", fmt.Sprintf("Updated tuple: %s at %s", id, tuple.UpdatedAt.Format(time.RFC3339)))
}
return tuple, nil
}
// DeleteTuple удаляет кортеж
func (tm *TupleManager) DeleteTuple(slice *types.Slice, id string) error {
if slice == nil {
return errors.New("slice is nil")
}
tm.mu.Lock()
defer tm.mu.Unlock()
if !tupleExistsInSlice(slice, id) {
return errors.New("tuple not found")
}
err := removeTupleFromSlice(slice, id)
if err != nil {
return err
}
atomic.AddInt64(&tm.stats.deleted, 1)
logger := utils.GetLogger()
if logger != nil {
logger.Log("INFO", "Deleted tuple: "+id)
}
return nil
}
// FindTuplesByIndex выполняет поиск кортежей по индексу
func (tm *TupleManager) FindTuplesByIndex(index *Index, key string) ([]unsafe.Pointer, error) {
if index == nil {
return nil, errors.New("index is nil")
}
ptr, exists := index.Lookup(key)
if !exists {
return nil, nil
}
return []unsafe.Pointer{ptr}, nil
}
// CompressColumn сжимает данные колонки
func (tm *TupleManager) CompressColumn(columnName string, data []interface{}) ([]byte, error) {
compressor, exists := tm.columnCompressors[columnName]
if !exists {
// Создаём компрессор по умолчанию
compressor = NewColumnCompressor("unknown")
tm.columnCompressors[columnName] = compressor
}
return compressor.Compress(data)
}
// GetCompressionStats возвращает статистику сжатия
func (tm *TupleManager) GetCompressionStats() map[string]interface{} {
stats := make(map[string]interface{})
for colName, compressor := range tm.columnCompressors {
stats[colName] = compressor.GetStats()
}
return stats
}
// getFieldType определяет тип поля
func getFieldType(v interface{}) string {
switch v.(type) {
case int, int64, int32:
return "int"
case float32, float64:
return "float64"
case string:
return "string"
case bool:
return "bool"
default:
return "unknown"
}
}
// GetStats возвращает статистику операций
func (tm *TupleManager) GetStats() map[string]int64 {
return map[string]int64{
"created": atomic.LoadInt64(&tm.stats.created),
"updated": atomic.LoadInt64(&tm.stats.updated),
"deleted": atomic.LoadInt64(&tm.stats.deleted),
"read": atomic.LoadInt64(&tm.stats.read),
}
}