// /futriis/internal/storage/tuple.go // Пакет storage реализует операции с кортежами (записями) - базовыми единицами данных. // TupleManager предоставляет wait-free операции создания, чтения, обновления и удаления кортежей // с использованием атомарных счётчиков для статистики и поддержкой индексов. package storage import ( "errors" "fmt" "reflect" "sync" "sync/atomic" "time" "unsafe" "futriis/pkg/types" "futriis/pkg/utils" ) // TupleManager управляет операциями с кортежами type TupleManager struct { stats struct { created int64 updated int64 deleted int64 read int64 } columnCompressors map[string]*ColumnCompressor // Имя колонки -> компрессор // Добавляем мьютекс для синхронизации доступа к кортежам mu sync.RWMutex } // NewTupleManager создаёт новый менеджер кортежей func NewTupleManager() *TupleManager { return &TupleManager{ columnCompressors: make(map[string]*ColumnCompressor), } } // вспомогательная функция для доступа к неэкспортируемому полю tuples в Slice func getTuplesFromSlice(slice *types.Slice) map[string]*types.Tuple { v := reflect.ValueOf(slice).Elem() field := v.FieldByName("tuples") if field.IsValid() && field.Kind() == reflect.Map { result := make(map[string]*types.Tuple) iter := field.MapRange() for iter.Next() { key := iter.Key().String() value := iter.Value().Interface() if tuple, ok := value.(*types.Tuple); ok { result[key] = tuple } } return result } return make(map[string]*types.Tuple) } // вспомогательная функция для добавления кортежа в неэкспортируемое поле tuples func addTupleToSlice(slice *types.Slice, id string, tuple *types.Tuple) error { v := reflect.ValueOf(slice).Elem() field := v.FieldByName("tuples") if field.IsValid() && field.Kind() == reflect.Map { if field.IsNil() { newMap := reflect.MakeMap(reflect.TypeOf(map[string]*types.Tuple{})) field.Set(newMap) } key := reflect.ValueOf(id) value := reflect.ValueOf(tuple) field.SetMapIndex(key, value) return nil } return errors.New("cannot access tuples field in slice") } // вспомогательная функция для удаления кортежа из неэкспортируемого поля tuples func removeTupleFromSlice(slice *types.Slice, id string) error { v := reflect.ValueOf(slice).Elem() field := v.FieldByName("tuples") if field.IsValid() && field.Kind() == reflect.Map { key := reflect.ValueOf(id) field.SetMapIndex(key, reflect.Value{}) return nil } return errors.New("cannot access tuples field in slice") } // вспомогательная функция для проверки существования кортежа func tupleExistsInSlice(slice *types.Slice, id string) bool { tuples := getTuplesFromSlice(slice) _, exists := tuples[id] return exists } // CreateTuple создаёт новый кортеж в указанном слайсе с временной меткой // Wait-free операция: использует атомарные операции для счётчиков func (tm *TupleManager) CreateTuple(slice *types.Slice, id string, fields map[string]interface{}) (*types.Tuple, error) { if slice == nil { return nil, errors.New("slice is nil") } tm.mu.Lock() defer tm.mu.Unlock() // Проверяем существование кортежа if tupleExistsInSlice(slice, id) { return nil, errors.New("tuple already exists") } // Создаём новый кортеж с временной меткой tuple := types.NewTuple(id) tuple.CreatedAt = time.Now() tuple.UpdatedAt = time.Now() for k, v := range fields { tuple.Fields[k] = v // Создаём компрессор для колонки, если его нет if _, ok := tm.columnCompressors[k]; !ok { colType := getFieldType(v) tm.columnCompressors[k] = NewColumnCompressor(colType) } } // Добавляем в слайс err := addTupleToSlice(slice, id, tuple) if err != nil { return nil, err } // Обновляем индексы, если они есть // Получаем доступ к индексам через таппл // В реальном коде здесь должна быть ссылка на IndexManager atomic.AddInt64(&tm.stats.created, 1) logger := utils.GetLogger() if logger != nil { logger.Log("INFO", fmt.Sprintf("Created tuple: %s at %s", id, tuple.CreatedAt.Format(time.RFC3339))) } return tuple, nil } // ReadTuple читает кортеж по ID // Wait-free операция для чтения func (tm *TupleManager) ReadTuple(slice *types.Slice, id string) (*types.Tuple, error) { if slice == nil { return nil, errors.New("slice is nil") } tm.mu.RLock() defer tm.mu.RUnlock() tuples := getTuplesFromSlice(slice) tuple, exists := tuples[id] if !exists { return nil, errors.New("tuple not found") } atomic.AddInt64(&tm.stats.read, 1) return tuple, nil } // UpdateTuple обновляет поля кортежа с обновлением временной метки func (tm *TupleManager) UpdateTuple(slice *types.Slice, id string, fields map[string]interface{}) (*types.Tuple, error) { if slice == nil { return nil, errors.New("slice is nil") } tm.mu.Lock() defer tm.mu.Unlock() tuples := getTuplesFromSlice(slice) tuple, exists := tuples[id] if !exists { return nil, errors.New("tuple not found") } // Обновляем поля for k, v := range fields { tuple.Fields[k] = v } // Обновляем временную метку tuple.UpdatedAt = time.Now() atomic.AddInt64(&tm.stats.updated, 1) logger := utils.GetLogger() if logger != nil { logger.Log("INFO", fmt.Sprintf("Updated tuple: %s at %s", id, tuple.UpdatedAt.Format(time.RFC3339))) } return tuple, nil } // DeleteTuple удаляет кортеж func (tm *TupleManager) DeleteTuple(slice *types.Slice, id string) error { if slice == nil { return errors.New("slice is nil") } tm.mu.Lock() defer tm.mu.Unlock() if !tupleExistsInSlice(slice, id) { return errors.New("tuple not found") } err := removeTupleFromSlice(slice, id) if err != nil { return err } atomic.AddInt64(&tm.stats.deleted, 1) logger := utils.GetLogger() if logger != nil { logger.Log("INFO", "Deleted tuple: "+id) } return nil } // FindTuplesByIndex выполняет поиск кортежей по индексу func (tm *TupleManager) FindTuplesByIndex(index *Index, key string) ([]unsafe.Pointer, error) { if index == nil { return nil, errors.New("index is nil") } ptr, exists := index.Lookup(key) if !exists { return nil, nil } return []unsafe.Pointer{ptr}, nil } // CompressColumn сжимает данные колонки func (tm *TupleManager) CompressColumn(columnName string, data []interface{}) ([]byte, error) { compressor, exists := tm.columnCompressors[columnName] if !exists { // Создаём компрессор по умолчанию compressor = NewColumnCompressor("unknown") tm.columnCompressors[columnName] = compressor } return compressor.Compress(data) } // GetCompressionStats возвращает статистику сжатия func (tm *TupleManager) GetCompressionStats() map[string]interface{} { stats := make(map[string]interface{}) for colName, compressor := range tm.columnCompressors { stats[colName] = compressor.GetStats() } return stats } // getFieldType определяет тип поля func getFieldType(v interface{}) string { switch v.(type) { case int, int64, int32: return "int" case float32, float64: return "float64" case string: return "string" case bool: return "bool" default: return "unknown" } } // GetStats возвращает статистику операций func (tm *TupleManager) GetStats() map[string]int64 { return map[string]int64{ "created": atomic.LoadInt64(&tm.stats.created), "updated": atomic.LoadInt64(&tm.stats.updated), "deleted": atomic.LoadInt64(&tm.stats.deleted), "read": atomic.LoadInt64(&tm.stats.read), } }