289 lines
8.3 KiB
Go
289 lines
8.3 KiB
Go
// /futriis/internal/storage/tuple.go
|
||
// Пакет storage реализует операции с кортежами (записями) - базовыми единицами данных.
|
||
// TupleManager предоставляет wait-free операции создания, чтения, обновления и удаления кортежей
|
||
// с использованием атомарных счётчиков для статистики и поддержкой индексов.
|
||
|
||
package storage
|
||
|
||
import (
|
||
"errors"
|
||
"fmt"
|
||
"reflect"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
"unsafe"
|
||
|
||
"futriis/pkg/types"
|
||
"futriis/pkg/utils"
|
||
)
|
||
|
||
// TupleManager управляет операциями с кортежами
|
||
type TupleManager struct {
|
||
stats struct {
|
||
created int64
|
||
updated int64
|
||
deleted int64
|
||
read int64
|
||
}
|
||
columnCompressors map[string]*ColumnCompressor // Имя колонки -> компрессор
|
||
// Добавляем мьютекс для синхронизации доступа к кортежам
|
||
mu sync.RWMutex
|
||
}
|
||
|
||
// NewTupleManager создаёт новый менеджер кортежей
|
||
func NewTupleManager() *TupleManager {
|
||
return &TupleManager{
|
||
columnCompressors: make(map[string]*ColumnCompressor),
|
||
}
|
||
}
|
||
|
||
// вспомогательная функция для доступа к неэкспортируемому полю tuples в Slice
|
||
func getTuplesFromSlice(slice *types.Slice) map[string]*types.Tuple {
|
||
v := reflect.ValueOf(slice).Elem()
|
||
field := v.FieldByName("tuples")
|
||
if field.IsValid() && field.Kind() == reflect.Map {
|
||
result := make(map[string]*types.Tuple)
|
||
iter := field.MapRange()
|
||
for iter.Next() {
|
||
key := iter.Key().String()
|
||
value := iter.Value().Interface()
|
||
if tuple, ok := value.(*types.Tuple); ok {
|
||
result[key] = tuple
|
||
}
|
||
}
|
||
return result
|
||
}
|
||
return make(map[string]*types.Tuple)
|
||
}
|
||
|
||
// вспомогательная функция для добавления кортежа в неэкспортируемое поле tuples
|
||
func addTupleToSlice(slice *types.Slice, id string, tuple *types.Tuple) error {
|
||
v := reflect.ValueOf(slice).Elem()
|
||
field := v.FieldByName("tuples")
|
||
if field.IsValid() && field.Kind() == reflect.Map {
|
||
if field.IsNil() {
|
||
newMap := reflect.MakeMap(reflect.TypeOf(map[string]*types.Tuple{}))
|
||
field.Set(newMap)
|
||
}
|
||
key := reflect.ValueOf(id)
|
||
value := reflect.ValueOf(tuple)
|
||
field.SetMapIndex(key, value)
|
||
return nil
|
||
}
|
||
return errors.New("cannot access tuples field in slice")
|
||
}
|
||
|
||
// вспомогательная функция для удаления кортежа из неэкспортируемого поля tuples
|
||
func removeTupleFromSlice(slice *types.Slice, id string) error {
|
||
v := reflect.ValueOf(slice).Elem()
|
||
field := v.FieldByName("tuples")
|
||
if field.IsValid() && field.Kind() == reflect.Map {
|
||
key := reflect.ValueOf(id)
|
||
field.SetMapIndex(key, reflect.Value{})
|
||
return nil
|
||
}
|
||
return errors.New("cannot access tuples field in slice")
|
||
}
|
||
|
||
// вспомогательная функция для проверки существования кортежа
|
||
func tupleExistsInSlice(slice *types.Slice, id string) bool {
|
||
tuples := getTuplesFromSlice(slice)
|
||
_, exists := tuples[id]
|
||
return exists
|
||
}
|
||
|
||
// CreateTuple создаёт новый кортеж в указанном слайсе с временной меткой
|
||
// Wait-free операция: использует атомарные операции для счётчиков
|
||
func (tm *TupleManager) CreateTuple(slice *types.Slice, id string, fields map[string]interface{}) (*types.Tuple, error) {
|
||
if slice == nil {
|
||
return nil, errors.New("slice is nil")
|
||
}
|
||
|
||
tm.mu.Lock()
|
||
defer tm.mu.Unlock()
|
||
|
||
// Проверяем существование кортежа
|
||
if tupleExistsInSlice(slice, id) {
|
||
return nil, errors.New("tuple already exists")
|
||
}
|
||
|
||
// Создаём новый кортеж с временной меткой
|
||
tuple := types.NewTuple(id)
|
||
tuple.CreatedAt = time.Now()
|
||
tuple.UpdatedAt = time.Now()
|
||
|
||
for k, v := range fields {
|
||
tuple.Fields[k] = v
|
||
|
||
// Создаём компрессор для колонки, если его нет
|
||
if _, ok := tm.columnCompressors[k]; !ok {
|
||
colType := getFieldType(v)
|
||
tm.columnCompressors[k] = NewColumnCompressor(colType)
|
||
}
|
||
}
|
||
|
||
// Добавляем в слайс
|
||
err := addTupleToSlice(slice, id, tuple)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// Обновляем индексы, если они есть
|
||
// Получаем доступ к индексам через таппл
|
||
// В реальном коде здесь должна быть ссылка на IndexManager
|
||
|
||
atomic.AddInt64(&tm.stats.created, 1)
|
||
|
||
logger := utils.GetLogger()
|
||
if logger != nil {
|
||
logger.Log("INFO", fmt.Sprintf("Created tuple: %s at %s", id, tuple.CreatedAt.Format(time.RFC3339)))
|
||
}
|
||
|
||
return tuple, nil
|
||
}
|
||
|
||
// ReadTuple читает кортеж по ID
|
||
// Wait-free операция для чтения
|
||
func (tm *TupleManager) ReadTuple(slice *types.Slice, id string) (*types.Tuple, error) {
|
||
if slice == nil {
|
||
return nil, errors.New("slice is nil")
|
||
}
|
||
|
||
tm.mu.RLock()
|
||
defer tm.mu.RUnlock()
|
||
|
||
tuples := getTuplesFromSlice(slice)
|
||
tuple, exists := tuples[id]
|
||
if !exists {
|
||
return nil, errors.New("tuple not found")
|
||
}
|
||
|
||
atomic.AddInt64(&tm.stats.read, 1)
|
||
return tuple, nil
|
||
}
|
||
|
||
// UpdateTuple обновляет поля кортежа с обновлением временной метки
|
||
func (tm *TupleManager) UpdateTuple(slice *types.Slice, id string, fields map[string]interface{}) (*types.Tuple, error) {
|
||
if slice == nil {
|
||
return nil, errors.New("slice is nil")
|
||
}
|
||
|
||
tm.mu.Lock()
|
||
defer tm.mu.Unlock()
|
||
|
||
tuples := getTuplesFromSlice(slice)
|
||
tuple, exists := tuples[id]
|
||
if !exists {
|
||
return nil, errors.New("tuple not found")
|
||
}
|
||
|
||
// Обновляем поля
|
||
for k, v := range fields {
|
||
tuple.Fields[k] = v
|
||
}
|
||
|
||
// Обновляем временную метку
|
||
tuple.UpdatedAt = time.Now()
|
||
|
||
atomic.AddInt64(&tm.stats.updated, 1)
|
||
|
||
logger := utils.GetLogger()
|
||
if logger != nil {
|
||
logger.Log("INFO", fmt.Sprintf("Updated tuple: %s at %s", id, tuple.UpdatedAt.Format(time.RFC3339)))
|
||
}
|
||
|
||
return tuple, nil
|
||
}
|
||
|
||
// DeleteTuple удаляет кортеж
|
||
func (tm *TupleManager) DeleteTuple(slice *types.Slice, id string) error {
|
||
if slice == nil {
|
||
return errors.New("slice is nil")
|
||
}
|
||
|
||
tm.mu.Lock()
|
||
defer tm.mu.Unlock()
|
||
|
||
if !tupleExistsInSlice(slice, id) {
|
||
return errors.New("tuple not found")
|
||
}
|
||
|
||
err := removeTupleFromSlice(slice, id)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
atomic.AddInt64(&tm.stats.deleted, 1)
|
||
|
||
logger := utils.GetLogger()
|
||
if logger != nil {
|
||
logger.Log("INFO", "Deleted tuple: "+id)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// FindTuplesByIndex выполняет поиск кортежей по индексу
|
||
func (tm *TupleManager) FindTuplesByIndex(index *Index, key string) ([]unsafe.Pointer, error) {
|
||
if index == nil {
|
||
return nil, errors.New("index is nil")
|
||
}
|
||
|
||
ptr, exists := index.Lookup(key)
|
||
if !exists {
|
||
return nil, nil
|
||
}
|
||
|
||
return []unsafe.Pointer{ptr}, nil
|
||
}
|
||
|
||
// CompressColumn сжимает данные колонки
|
||
func (tm *TupleManager) CompressColumn(columnName string, data []interface{}) ([]byte, error) {
|
||
compressor, exists := tm.columnCompressors[columnName]
|
||
if !exists {
|
||
// Создаём компрессор по умолчанию
|
||
compressor = NewColumnCompressor("unknown")
|
||
tm.columnCompressors[columnName] = compressor
|
||
}
|
||
|
||
return compressor.Compress(data)
|
||
}
|
||
|
||
// GetCompressionStats возвращает статистику сжатия
|
||
func (tm *TupleManager) GetCompressionStats() map[string]interface{} {
|
||
stats := make(map[string]interface{})
|
||
|
||
for colName, compressor := range tm.columnCompressors {
|
||
stats[colName] = compressor.GetStats()
|
||
}
|
||
|
||
return stats
|
||
}
|
||
|
||
// getFieldType определяет тип поля
|
||
func getFieldType(v interface{}) string {
|
||
switch v.(type) {
|
||
case int, int64, int32:
|
||
return "int"
|
||
case float32, float64:
|
||
return "float64"
|
||
case string:
|
||
return "string"
|
||
case bool:
|
||
return "bool"
|
||
default:
|
||
return "unknown"
|
||
}
|
||
}
|
||
|
||
// GetStats возвращает статистику операций
|
||
func (tm *TupleManager) GetStats() map[string]int64 {
|
||
return map[string]int64{
|
||
"created": atomic.LoadInt64(&tm.stats.created),
|
||
"updated": atomic.LoadInt64(&tm.stats.updated),
|
||
"deleted": atomic.LoadInt64(&tm.stats.deleted),
|
||
"read": atomic.LoadInt64(&tm.stats.read),
|
||
}
|
||
}
|