289 lines
8.3 KiB
Go
289 lines
8.3 KiB
Go
|
|
// /futriis/internal/storage/tuple.go
|
|||
|
|
// Пакет storage реализует операции с кортежами (записями) - базовыми единицами данных.
|
|||
|
|
// TupleManager предоставляет wait-free операции создания, чтения, обновления и удаления кортежей
|
|||
|
|
// с использованием атомарных счётчиков для статистики и поддержкой индексов.
|
|||
|
|
|
|||
|
|
package storage
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"errors"
|
|||
|
|
"fmt"
|
|||
|
|
"reflect"
|
|||
|
|
"sync"
|
|||
|
|
"sync/atomic"
|
|||
|
|
"time"
|
|||
|
|
"unsafe"
|
|||
|
|
|
|||
|
|
"futriis/pkg/types"
|
|||
|
|
"futriis/pkg/utils"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// TupleManager управляет операциями с кортежами
|
|||
|
|
type TupleManager struct {
|
|||
|
|
stats struct {
|
|||
|
|
created int64
|
|||
|
|
updated int64
|
|||
|
|
deleted int64
|
|||
|
|
read int64
|
|||
|
|
}
|
|||
|
|
columnCompressors map[string]*ColumnCompressor // Имя колонки -> компрессор
|
|||
|
|
// Добавляем мьютекс для синхронизации доступа к кортежам
|
|||
|
|
mu sync.RWMutex
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// NewTupleManager создаёт новый менеджер кортежей
|
|||
|
|
func NewTupleManager() *TupleManager {
|
|||
|
|
return &TupleManager{
|
|||
|
|
columnCompressors: make(map[string]*ColumnCompressor),
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// вспомогательная функция для доступа к неэкспортируемому полю tuples в Slice
|
|||
|
|
func getTuplesFromSlice(slice *types.Slice) map[string]*types.Tuple {
|
|||
|
|
v := reflect.ValueOf(slice).Elem()
|
|||
|
|
field := v.FieldByName("tuples")
|
|||
|
|
if field.IsValid() && field.Kind() == reflect.Map {
|
|||
|
|
result := make(map[string]*types.Tuple)
|
|||
|
|
iter := field.MapRange()
|
|||
|
|
for iter.Next() {
|
|||
|
|
key := iter.Key().String()
|
|||
|
|
value := iter.Value().Interface()
|
|||
|
|
if tuple, ok := value.(*types.Tuple); ok {
|
|||
|
|
result[key] = tuple
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
return result
|
|||
|
|
}
|
|||
|
|
return make(map[string]*types.Tuple)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// вспомогательная функция для добавления кортежа в неэкспортируемое поле tuples
|
|||
|
|
func addTupleToSlice(slice *types.Slice, id string, tuple *types.Tuple) error {
|
|||
|
|
v := reflect.ValueOf(slice).Elem()
|
|||
|
|
field := v.FieldByName("tuples")
|
|||
|
|
if field.IsValid() && field.Kind() == reflect.Map {
|
|||
|
|
if field.IsNil() {
|
|||
|
|
newMap := reflect.MakeMap(reflect.TypeOf(map[string]*types.Tuple{}))
|
|||
|
|
field.Set(newMap)
|
|||
|
|
}
|
|||
|
|
key := reflect.ValueOf(id)
|
|||
|
|
value := reflect.ValueOf(tuple)
|
|||
|
|
field.SetMapIndex(key, value)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
return errors.New("cannot access tuples field in slice")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// вспомогательная функция для удаления кортежа из неэкспортируемого поля tuples
|
|||
|
|
func removeTupleFromSlice(slice *types.Slice, id string) error {
|
|||
|
|
v := reflect.ValueOf(slice).Elem()
|
|||
|
|
field := v.FieldByName("tuples")
|
|||
|
|
if field.IsValid() && field.Kind() == reflect.Map {
|
|||
|
|
key := reflect.ValueOf(id)
|
|||
|
|
field.SetMapIndex(key, reflect.Value{})
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
return errors.New("cannot access tuples field in slice")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// вспомогательная функция для проверки существования кортежа
|
|||
|
|
func tupleExistsInSlice(slice *types.Slice, id string) bool {
|
|||
|
|
tuples := getTuplesFromSlice(slice)
|
|||
|
|
_, exists := tuples[id]
|
|||
|
|
return exists
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// CreateTuple создаёт новый кортеж в указанном слайсе с временной меткой
|
|||
|
|
// Wait-free операция: использует атомарные операции для счётчиков
|
|||
|
|
func (tm *TupleManager) CreateTuple(slice *types.Slice, id string, fields map[string]interface{}) (*types.Tuple, error) {
|
|||
|
|
if slice == nil {
|
|||
|
|
return nil, errors.New("slice is nil")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
tm.mu.Lock()
|
|||
|
|
defer tm.mu.Unlock()
|
|||
|
|
|
|||
|
|
// Проверяем существование кортежа
|
|||
|
|
if tupleExistsInSlice(slice, id) {
|
|||
|
|
return nil, errors.New("tuple already exists")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Создаём новый кортеж с временной меткой
|
|||
|
|
tuple := types.NewTuple(id)
|
|||
|
|
tuple.CreatedAt = time.Now()
|
|||
|
|
tuple.UpdatedAt = time.Now()
|
|||
|
|
|
|||
|
|
for k, v := range fields {
|
|||
|
|
tuple.Fields[k] = v
|
|||
|
|
|
|||
|
|
// Создаём компрессор для колонки, если его нет
|
|||
|
|
if _, ok := tm.columnCompressors[k]; !ok {
|
|||
|
|
colType := getFieldType(v)
|
|||
|
|
tm.columnCompressors[k] = NewColumnCompressor(colType)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Добавляем в слайс
|
|||
|
|
err := addTupleToSlice(slice, id, tuple)
|
|||
|
|
if err != nil {
|
|||
|
|
return nil, err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Обновляем индексы, если они есть
|
|||
|
|
// Получаем доступ к индексам через таппл
|
|||
|
|
// В реальном коде здесь должна быть ссылка на IndexManager
|
|||
|
|
|
|||
|
|
atomic.AddInt64(&tm.stats.created, 1)
|
|||
|
|
|
|||
|
|
logger := utils.GetLogger()
|
|||
|
|
if logger != nil {
|
|||
|
|
logger.Log("INFO", fmt.Sprintf("Created tuple: %s at %s", id, tuple.CreatedAt.Format(time.RFC3339)))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return tuple, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ReadTuple читает кортеж по ID
|
|||
|
|
// Wait-free операция для чтения
|
|||
|
|
func (tm *TupleManager) ReadTuple(slice *types.Slice, id string) (*types.Tuple, error) {
|
|||
|
|
if slice == nil {
|
|||
|
|
return nil, errors.New("slice is nil")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
tm.mu.RLock()
|
|||
|
|
defer tm.mu.RUnlock()
|
|||
|
|
|
|||
|
|
tuples := getTuplesFromSlice(slice)
|
|||
|
|
tuple, exists := tuples[id]
|
|||
|
|
if !exists {
|
|||
|
|
return nil, errors.New("tuple not found")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
atomic.AddInt64(&tm.stats.read, 1)
|
|||
|
|
return tuple, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// UpdateTuple обновляет поля кортежа с обновлением временной метки
|
|||
|
|
func (tm *TupleManager) UpdateTuple(slice *types.Slice, id string, fields map[string]interface{}) (*types.Tuple, error) {
|
|||
|
|
if slice == nil {
|
|||
|
|
return nil, errors.New("slice is nil")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
tm.mu.Lock()
|
|||
|
|
defer tm.mu.Unlock()
|
|||
|
|
|
|||
|
|
tuples := getTuplesFromSlice(slice)
|
|||
|
|
tuple, exists := tuples[id]
|
|||
|
|
if !exists {
|
|||
|
|
return nil, errors.New("tuple not found")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Обновляем поля
|
|||
|
|
for k, v := range fields {
|
|||
|
|
tuple.Fields[k] = v
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Обновляем временную метку
|
|||
|
|
tuple.UpdatedAt = time.Now()
|
|||
|
|
|
|||
|
|
atomic.AddInt64(&tm.stats.updated, 1)
|
|||
|
|
|
|||
|
|
logger := utils.GetLogger()
|
|||
|
|
if logger != nil {
|
|||
|
|
logger.Log("INFO", fmt.Sprintf("Updated tuple: %s at %s", id, tuple.UpdatedAt.Format(time.RFC3339)))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return tuple, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// DeleteTuple удаляет кортеж
|
|||
|
|
func (tm *TupleManager) DeleteTuple(slice *types.Slice, id string) error {
|
|||
|
|
if slice == nil {
|
|||
|
|
return errors.New("slice is nil")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
tm.mu.Lock()
|
|||
|
|
defer tm.mu.Unlock()
|
|||
|
|
|
|||
|
|
if !tupleExistsInSlice(slice, id) {
|
|||
|
|
return errors.New("tuple not found")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
err := removeTupleFromSlice(slice, id)
|
|||
|
|
if err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
atomic.AddInt64(&tm.stats.deleted, 1)
|
|||
|
|
|
|||
|
|
logger := utils.GetLogger()
|
|||
|
|
if logger != nil {
|
|||
|
|
logger.Log("INFO", "Deleted tuple: "+id)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// FindTuplesByIndex выполняет поиск кортежей по индексу
|
|||
|
|
func (tm *TupleManager) FindTuplesByIndex(index *Index, key string) ([]unsafe.Pointer, error) {
|
|||
|
|
if index == nil {
|
|||
|
|
return nil, errors.New("index is nil")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ptr, exists := index.Lookup(key)
|
|||
|
|
if !exists {
|
|||
|
|
return nil, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return []unsafe.Pointer{ptr}, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// CompressColumn сжимает данные колонки
|
|||
|
|
func (tm *TupleManager) CompressColumn(columnName string, data []interface{}) ([]byte, error) {
|
|||
|
|
compressor, exists := tm.columnCompressors[columnName]
|
|||
|
|
if !exists {
|
|||
|
|
// Создаём компрессор по умолчанию
|
|||
|
|
compressor = NewColumnCompressor("unknown")
|
|||
|
|
tm.columnCompressors[columnName] = compressor
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return compressor.Compress(data)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetCompressionStats возвращает статистику сжатия
|
|||
|
|
func (tm *TupleManager) GetCompressionStats() map[string]interface{} {
|
|||
|
|
stats := make(map[string]interface{})
|
|||
|
|
|
|||
|
|
for colName, compressor := range tm.columnCompressors {
|
|||
|
|
stats[colName] = compressor.GetStats()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return stats
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// getFieldType определяет тип поля
|
|||
|
|
func getFieldType(v interface{}) string {
|
|||
|
|
switch v.(type) {
|
|||
|
|
case int, int64, int32:
|
|||
|
|
return "int"
|
|||
|
|
case float32, float64:
|
|||
|
|
return "float64"
|
|||
|
|
case string:
|
|||
|
|
return "string"
|
|||
|
|
case bool:
|
|||
|
|
return "bool"
|
|||
|
|
default:
|
|||
|
|
return "unknown"
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetStats возвращает статистику операций
|
|||
|
|
func (tm *TupleManager) GetStats() map[string]int64 {
|
|||
|
|
return map[string]int64{
|
|||
|
|
"created": atomic.LoadInt64(&tm.stats.created),
|
|||
|
|
"updated": atomic.LoadInt64(&tm.stats.updated),
|
|||
|
|
"deleted": atomic.LoadInt64(&tm.stats.deleted),
|
|||
|
|
"read": atomic.LoadInt64(&tm.stats.read),
|
|||
|
|
}
|
|||
|
|
}
|