first commit

This commit is contained in:
2026-04-19 16:42:41 +03:00
commit e82fb947be
37 changed files with 14591 additions and 0 deletions

126
internal/storage/audit.go Normal file
View File

@@ -0,0 +1,126 @@
/*
* Copyright 2026 Safronov Grigorii
*
* Licensed under the CDDL, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
* https://opensource.org/licenses/CDDL-1.0
*/
// Файл: internal/storage/audit.go
// Назначение: Аудит всех операций создания, изменения, удаления данных
// с записью временной метки с точностью до миллисекунды
package storage
import (
"fmt"
"sync"
"time"
)
// AuditEntry представляет запись аудита
type AuditEntry struct {
ID string `msgpack:"id"`
Timestamp int64 `msgpack:"timestamp"` // Unix миллисекунды
TimestampStr string `msgpack:"timestamp_str"` // Человекочитаемая строка
Operation string `msgpack:"operation"` // CREATE, UPDATE, DELETE, START, COMMIT, ABORT, CLUSTER
DataType string `msgpack:"data_type"` // DATABASE, COLLECTION, DOCUMENT, FIELD, TUPLE, SESSION, TRANSACTION, CLUSTER
Name string `msgpack:"name"` // Имя объекта
Details map[string]interface{} `msgpack:"details"` // Детали операции
}
// AuditLogger управляет аудитом
type AuditLogger struct {
entries []AuditEntry
mu sync.RWMutex
}
var globalAuditLogger = &AuditLogger{
entries: make([]AuditEntry, 0),
}
// GetCurrentTimestamp возвращает текущую временную метку с миллисекундами
func GetCurrentTimestamp() (int64, string) {
now := time.Now()
timestampMs := now.UnixMilli()
timestampStr := now.Format("2006-01-02 15:04:05.000")
return timestampMs, timestampStr
}
// LogAudit записывает событие в аудит
func LogAudit(operation, dataType, name string, details map[string]interface{}) {
timestampMs, timestampStr := GetCurrentTimestamp()
entry := AuditEntry{
ID: fmt.Sprintf("%d", timestampMs),
Timestamp: timestampMs,
TimestampStr: timestampStr,
Operation: operation,
DataType: dataType,
Name: name,
Details: details,
}
globalAuditLogger.mu.Lock()
globalAuditLogger.entries = append(globalAuditLogger.entries, entry)
globalAuditLogger.mu.Unlock()
}
// GetAuditLog возвращает копию лога аудита
func GetAuditLog() []AuditEntry {
globalAuditLogger.mu.RLock()
defer globalAuditLogger.mu.RUnlock()
result := make([]AuditEntry, len(globalAuditLogger.entries))
copy(result, globalAuditLogger.entries)
return result
}
// AuditDatabaseOperation логирует операцию с базой данных
func AuditDatabaseOperation(operation, dbName string) {
LogAudit(operation, "DATABASE", dbName, map[string]interface{}{
"database": dbName,
})
}
// AuditCollectionOperation логирует операцию с коллекцией
func AuditCollectionOperation(operation, dbName, collName string, settings interface{}) {
LogAudit(operation, "COLLECTION", fmt.Sprintf("%s.%s", dbName, collName), map[string]interface{}{
"database": dbName,
"collection": collName,
"settings": settings,
})
}
// AuditDocumentOperation логирует операцию с документом
func AuditDocumentOperation(operation, dbName, collName, docID string, fields map[string]interface{}) {
LogAudit(operation, "DOCUMENT", fmt.Sprintf("%s.%s.%s", dbName, collName, docID), map[string]interface{}{
"database": dbName,
"collection": collName,
"document_id": docID,
"fields": fields,
})
}
// AuditFieldOperation логирует операцию с полем
func AuditFieldOperation(operation, dbName, collName, docID, fieldName string, value interface{}) {
LogAudit(operation, "FIELD", fmt.Sprintf("%s.%s.%s.%s", dbName, collName, docID, fieldName), map[string]interface{}{
"database": dbName,
"collection": collName,
"document_id": docID,
"field": fieldName,
"value": value,
})
}
// AuditTupleOperation логирует операцию с кортежем
func AuditTupleOperation(operation, dbName, collName, docID, tuplePath string) {
LogAudit(operation, "TUPLE", fmt.Sprintf("%s.%s.%s.%s", dbName, collName, docID, tuplePath), map[string]interface{}{
"database": dbName,
"collection": collName,
"document_id": docID,
"tuple_path": tuplePath,
})
}

View File

@@ -0,0 +1,766 @@
/*
* Copyright 2026 Safronov Grigorii
*
* Licensed under the CDDL, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
* https://opensource.org/licenses/CDDL-1.0
*/
// Файл: internal/storage/collection.go
// Назначение: Реализация коллекции с индексами (первичными и вторичными).
// Индексы хранятся отдельно от документов, обеспечивают wait-free доступ.
// Исправлено: корректная работа уникальных индексов, удаление из индексов при обновлении.
package storage
import (
"fmt"
"sync"
"sync/atomic"
"time"
"strings"
"futriis/internal/serializer"
)
// Collection представляет коллекцию документов (аналог таблицы)
type Collection struct {
name string
docs sync.Map // map[string]*Document - wait-free хранилище документов
indexes sync.Map // map[string]*Index - индексы для быстрого поиска
metadata *CollectionMetadata // Метаданные коллекции
docCount atomic.Int64 // Атомарный счётчик документов
sizeBytes atomic.Int64 // Атомарный размер коллекции в байтах
mu sync.RWMutex // Для операций, изменяющих структуру коллекции
constraints *Constraints // Ограничения коллекции
acl *CollectionACL // ACL для коллекции
}
// CollectionMetadata содержит метаданные коллекции
type CollectionMetadata struct {
Name string `msgpack:"name"`
CreatedAt int64 `msgpack:"created_at"`
UpdatedAt int64 `msgpack:"updated_at"`
DocumentCount int64 `msgpack:"document_count"`
SizeBytes int64 `msgpack:"size_bytes"`
IndexCount int `msgpack:"index_count"`
Settings *CollectionSettings `msgpack:"settings"`
}
// CollectionSettings содержит настройки коллекции
type CollectionSettings struct {
MaxDocuments int `msgpack:"max_documents"` // Максимальное количество документов (0 = безлимит)
ValidateSchema bool `msgpack:"validate_schema"` // Валидировать схему документов
AutoIndexID bool `msgpack:"auto_index_id"` // Автоматически индексировать поле _id
TTLSeconds int `msgpack:"ttl_seconds"` // Время жизни документов (0 = бессрочно)
}
// Index представляет индекс для ускорения поиска (хранится отдельно от документов)
type Index struct {
Name string `msgpack:"name"`
Fields []string `msgpack:"fields"` // Поля для индексации
Unique bool `msgpack:"unique"` // Уникальный индекс
data sync.Map // map[interface{}]string - значение индекса -> ID документа
}
// Constraints представляет ограничения на коллекцию
type Constraints struct {
mu sync.RWMutex
RequiredFields map[string]bool // Обязательные поля
UniqueFields map[string]bool // Уникальные поля (дополнительно к индексам)
MinValues map[string]float64 // Минимальные значения для числовых полей
MaxValues map[string]float64 // Максимальные значения для числовых полей
PatternFields map[string]string // Regexp паттерны для полей
EnumFields map[string][]interface{} // Допустимые значения для полей
}
// CollectionACL представляет список контроля доступа для коллекции
type CollectionACL struct {
mu sync.RWMutex
ReadRoles map[string]bool // Роли, имеющие доступ на чтение
WriteRoles map[string]bool // Роли, имеющие доступ на запись
DeleteRoles map[string]bool // Роли, имеющие доступ на удаление
AdminRoles map[string]bool // Роли, имеющие полный доступ
}
// NewCollection создаёт новую коллекцию
func NewCollection(name string, settings *CollectionSettings) *Collection {
if settings == nil {
settings = &CollectionSettings{
MaxDocuments: 0,
ValidateSchema: false,
AutoIndexID: true,
TTLSeconds: 0,
}
}
now := time.Now().UnixMilli()
coll := &Collection{
name: name,
metadata: &CollectionMetadata{
Name: name,
CreatedAt: now,
UpdatedAt: now,
DocumentCount: 0,
SizeBytes: 0,
IndexCount: 0,
Settings: settings,
},
constraints: &Constraints{
RequiredFields: make(map[string]bool),
UniqueFields: make(map[string]bool),
MinValues: make(map[string]float64),
MaxValues: make(map[string]float64),
PatternFields: make(map[string]string),
EnumFields: make(map[string][]interface{}),
},
acl: &CollectionACL{
ReadRoles: make(map[string]bool),
WriteRoles: make(map[string]bool),
DeleteRoles: make(map[string]bool),
AdminRoles: make(map[string]bool),
},
}
// Автоматически создаём первичный индекс по _id
if settings.AutoIndexID {
coll.CreateIndex("_id_", []string{"_id"}, true)
}
// Запускаем фоновую задачу для удаления просроченных документов
if settings.TTLSeconds > 0 {
go coll.ttlCleanupLoop()
}
return coll
}
// Name возвращает имя коллекции
func (c *Collection) Name() string {
return c.name
}
// Insert вставляет документ в коллекцию (wait-free)
func (c *Collection) Insert(doc *Document) error {
// Проверка ограничений
if err := c.constraints.ValidateDocument(doc); err != nil {
return fmt.Errorf("constraint violation: %v", err)
}
// Проверка ACL (будет вызвано из верхнего уровня с ролью)
// Проверка на максимальное количество документов
if c.metadata.Settings.MaxDocuments > 0 {
if c.docCount.Load() >= int64(c.metadata.Settings.MaxDocuments) {
return fmt.Errorf("collection is full: max documents %d reached", c.metadata.Settings.MaxDocuments)
}
}
// Валидация схемы (если включена)
if c.metadata.Settings.ValidateSchema {
if err := c.validateDocument(doc); err != nil {
return fmt.Errorf("document validation failed: %v", err)
}
}
// Проверка уникальных индексов
if err := c.checkUniqueConstraints(doc); err != nil {
return err
}
// Сериализация для проверки (опционально)
data, err := serializer.Marshal(doc)
if err != nil {
return fmt.Errorf("failed to serialize document: %v", err)
}
// Атомарное сохранение документа
if _, loaded := c.docs.LoadOrStore(doc.ID, doc); loaded {
return fmt.Errorf("document with id %s already exists", doc.ID)
}
// Обновление индексов (wait-free)
c.updateIndexes(doc, true)
// Обновление метаданных
c.docCount.Add(1)
c.sizeBytes.Add(int64(len(data)))
c.metadata.DocumentCount = c.docCount.Load()
c.metadata.SizeBytes = c.sizeBytes.Load()
c.metadata.UpdatedAt = time.Now().UnixMilli()
return nil
}
// InsertFromMap создаёт и вставляет документ из map
func (c *Collection) InsertFromMap(fields map[string]interface{}) error {
doc := NewDocument()
for k, v := range fields {
doc.SetField(k, v)
}
return c.Insert(doc)
}
// Find находит документ по ID (с использованием первичного индекса)
func (c *Collection) Find(id string) (*Document, error) {
if val, ok := c.docs.Load(id); ok {
doc := val.(*Document)
// Проверяем, не истёк ли TTL
if c.metadata.Settings.TTLSeconds > 0 {
if time.Now().UnixMilli()-doc.CreatedAt > int64(c.metadata.Settings.TTLSeconds*1000) {
c.Delete(id) // Автоматически удаляем просроченный документ
return nil, fmt.Errorf("key not found")
}
}
return doc, nil
}
return nil, fmt.Errorf("key not found")
}
// compareValues сравнивает два значения с учётом типа
func compareValues(a, b interface{}) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
// Пробуем прямое сравнение
if a == b {
return true
}
// Пробуем сравнение через строковое представление для разных типов
aStr := fmt.Sprintf("%v", a)
bStr := fmt.Sprintf("%v", b)
return aStr == bStr
}
// FindByIndex находит документы по значению индексированного поля
// Исправлено: корректный поиск для неуникальных индексов с правильным сравнением типов
func (c *Collection) FindByIndex(indexName string, value interface{}) ([]*Document, error) {
idxVal, ok := c.indexes.Load(indexName)
if !ok {
return nil, fmt.Errorf("index not found: %s", indexName)
}
index := idxVal.(*Index)
docs := make([]*Document, 0)
if index.Unique {
// Уникальный индекс возвращает один документ
if docID, ok := index.data.Load(value); ok {
if doc, err := c.Find(docID.(string)); err == nil {
docs = append(docs, doc)
}
}
} else {
// Исправлено: для неуникального индекса используем Range с правильным сравнением
index.data.Range(func(key, val interface{}) bool {
// key - значение индекса, val - ID документа
if compareValues(key, value) {
if doc, err := c.Find(val.(string)); err == nil {
docs = append(docs, doc)
}
}
return true
})
}
return docs, nil
}
// FindByIndexPrefix находит документы по префиксу индекса (для строковых полей)
func (c *Collection) FindByIndexPrefix(indexName string, prefix string) ([]*Document, error) {
idxVal, ok := c.indexes.Load(indexName)
if !ok {
return nil, fmt.Errorf("index not found: %s", indexName)
}
index := idxVal.(*Index)
docs := make([]*Document, 0)
index.data.Range(func(key, val interface{}) bool {
if keyStr, ok := key.(string); ok {
if strings.HasPrefix(keyStr, prefix) {
if doc, err := c.Find(val.(string)); err == nil {
docs = append(docs, doc)
}
}
}
return true
})
return docs, nil
}
// Update обновляет документ по ID
// Исправлено: корректное обновление индексов при изменении индексированных полей
func (c *Collection) Update(id string, updates map[string]interface{}) error {
val, ok := c.docs.Load(id)
if !ok {
return fmt.Errorf("key not found")
}
oldDoc := val.(*Document)
// Создаём копию для проверки уникальности
newDoc := oldDoc.Clone()
if err := newDoc.Update(updates); err != nil {
return err
}
// Проверяем ограничения
if err := c.constraints.ValidateDocument(newDoc); err != nil {
return fmt.Errorf("constraint violation: %v", err)
}
// Проверяем уникальные индексы
if err := c.checkUniqueConstraintsUpdate(oldDoc, newDoc); err != nil {
return err
}
// Исправлено: сначала удаляем старые индексы, потом добавляем новые
c.removeFromIndexes(oldDoc)
c.addToIndexes(newDoc)
// Сохраняем обновлённый документ
c.docs.Store(id, newDoc)
c.metadata.UpdatedAt = time.Now().UnixMilli()
return nil
}
// Delete удаляет документ по ID
// Исправлено: удаление из всех индексов перед удалением документа
func (c *Collection) Delete(id string) error {
val, ok := c.docs.Load(id)
if !ok {
return fmt.Errorf("key not found")
}
doc := val.(*Document)
// Удаляем из индексов
c.removeFromIndexes(doc)
// Удаляем документ
c.docs.Delete(id)
// Обновляем метаданные
c.docCount.Add(-1)
// Размер не обновляем для простоты (можно пересчитать при необходимости)
c.metadata.DocumentCount = c.docCount.Load()
c.metadata.UpdatedAt = time.Now().UnixMilli()
return nil
}
// removeFromIndexes удаляет документ из всех индексов (wait-free)
func (c *Collection) removeFromIndexes(doc *Document) {
c.indexes.Range(func(key, value interface{}) bool {
index := value.(*Index)
indexValue := c.extractIndexValue(doc, index.Fields)
index.data.Delete(indexValue)
return true
})
}
// addToIndexes добавляет документ во все индексы (wait-free)
func (c *Collection) addToIndexes(doc *Document) {
c.indexes.Range(func(key, value interface{}) bool {
index := value.(*Index)
indexValue := c.extractIndexValue(doc, index.Fields)
if index.Unique {
index.data.LoadOrStore(indexValue, doc.ID)
} else {
index.data.Store(indexValue, doc.ID)
}
return true
})
}
// CreateIndex создаёт новый индекс на коллекции (wait-free friendly)
func (c *Collection) CreateIndex(name string, fields []string, unique bool) error {
c.mu.Lock()
defer c.mu.Unlock()
if _, exists := c.indexes.Load(name); exists {
return fmt.Errorf("index %s already exists", name)
}
index := &Index{
Name: name,
Fields: fields,
Unique: unique,
}
// Строим индекс на существующих документах (wait-free)
c.docs.Range(func(key, value interface{}) bool {
doc := value.(*Document)
indexValue := c.extractIndexValue(doc, fields)
if unique {
if _, loaded := index.data.LoadOrStore(indexValue, doc.ID); loaded {
// Найден дубликат - откатываем создание индекса
c.mu.Unlock()
return false
}
} else {
index.data.Store(indexValue, doc.ID)
}
return true
})
c.indexes.Store(name, index)
c.metadata.IndexCount++
return nil
}
// DropIndex удаляет индекс
func (c *Collection) DropIndex(name string) error {
if _, exists := c.indexes.LoadAndDelete(name); !exists {
return fmt.Errorf("index not found: %s", name)
}
c.metadata.IndexCount--
return nil
}
// GetIndexes возвращает список всех индексов
func (c *Collection) GetIndexes() []string {
names := make([]string, 0)
c.indexes.Range(func(key, value interface{}) bool {
names = append(names, key.(string))
return true
})
return names
}
// extractIndexValue извлекает значение из документа для индексации
func (c *Collection) extractIndexValue(doc *Document, fields []string) interface{} {
if len(fields) == 1 {
val, _ := doc.GetField(fields[0])
return val
}
// Составной индекс - возвращаем строковое представление
parts := make([]string, 0, len(fields))
for _, field := range fields {
if val, err := doc.GetField(field); err == nil {
parts = append(parts, fmt.Sprintf("%v", val))
} else {
parts = append(parts, "NULL")
}
}
return strings.Join(parts, "|")
}
// updateIndexes обновляет индексы для документа (исправлено: используем отдельные методы)
func (c *Collection) updateIndexes(doc *Document, add bool) {
if add {
c.addToIndexes(doc)
} else {
c.removeFromIndexes(doc)
}
}
// checkUniqueConstraints проверяет уникальные индексы перед вставкой
func (c *Collection) checkUniqueConstraints(doc *Document) error {
var errs []string
c.indexes.Range(func(key, value interface{}) bool {
index := value.(*Index)
if index.Unique {
indexValue := c.extractIndexValue(doc, index.Fields)
if _, exists := index.data.Load(indexValue); exists {
errs = append(errs, fmt.Sprintf("duplicate key for index %s: %v", index.Name, indexValue))
}
}
return true
})
if len(errs) > 0 {
return fmt.Errorf("%s", strings.Join(errs, "; "))
}
return nil
}
// checkUniqueConstraintsUpdate проверяет уникальность при обновлении
func (c *Collection) checkUniqueConstraintsUpdate(oldDoc, newDoc *Document) error {
var errs []string
c.indexes.Range(func(key, value interface{}) bool {
index := value.(*Index)
if index.Unique {
oldValue := c.extractIndexValue(oldDoc, index.Fields)
newValue := c.extractIndexValue(newDoc, index.Fields)
if fmt.Sprintf("%v", oldValue) != fmt.Sprintf("%v", newValue) {
if _, exists := index.data.Load(newValue); exists {
errs = append(errs, fmt.Sprintf("duplicate key for index %s: %v", index.Name, newValue))
}
}
}
return true
})
if len(errs) > 0 {
return fmt.Errorf("%s", strings.Join(errs, "; "))
}
return nil
}
// validateDocument валидирует документ согласно схеме коллекции
func (c *Collection) validateDocument(doc *Document) error {
if doc.ID == "" {
return fmt.Errorf("document must have _id field")
}
return nil
}
// ttlCleanupLoop периодически удаляет просроченные документы
func (c *Collection) ttlCleanupLoop() {
ticker := time.NewTicker(time.Duration(c.metadata.Settings.TTLSeconds/2) * time.Second)
defer ticker.Stop()
for range ticker.C {
now := time.Now().UnixMilli()
toDelete := make([]string, 0)
c.docs.Range(func(key, value interface{}) bool {
doc := value.(*Document)
if now-doc.CreatedAt > int64(c.metadata.Settings.TTLSeconds*1000) {
toDelete = append(toDelete, doc.ID)
}
return true
})
for _, id := range toDelete {
c.Delete(id)
}
}
}
// Count возвращает количество документов в коллекции
func (c *Collection) Count() int64 {
return c.docCount.Load()
}
// Size возвращает размер коллекции в байтах
func (c *Collection) Size() int64 {
return c.sizeBytes.Load()
}
// GetAllDocuments возвращает все документы коллекции
func (c *Collection) GetAllDocuments() []*Document {
docs := make([]*Document, 0, c.docCount.Load())
c.docs.Range(func(key, value interface{}) bool {
docs = append(docs, value.(*Document))
return true
})
return docs
}
// FindByFilter находит документы по произвольному фильтру
func (c *Collection) FindByFilter(filter func(*Document) bool) []*Document {
results := make([]*Document, 0)
c.docs.Range(func(key, value interface{}) bool {
doc := value.(*Document)
if filter(doc) {
results = append(results, doc)
}
return true
})
return results
}
// GetMetadata возвращает метаданные коллекции
func (c *Collection) GetMetadata() *CollectionMetadata {
c.mu.RLock()
defer c.mu.RUnlock()
return c.metadata
}
// Drop удаляет все документы из коллекции
func (c *Collection) Drop() error {
c.mu.Lock()
defer c.mu.Unlock()
c.docs = sync.Map{}
c.indexes = sync.Map{}
if c.metadata.Settings.AutoIndexID {
c.CreateIndex("_id_", []string{"_id"}, true)
}
c.docCount.Store(0)
c.sizeBytes.Store(0)
c.metadata.DocumentCount = 0
c.metadata.SizeBytes = 0
c.metadata.UpdatedAt = time.Now().UnixMilli()
return nil
}
// ========== Constraints Methods ==========
// AddRequiredField добавляет обязательное поле
func (c *Collection) AddRequiredField(field string) {
c.constraints.mu.Lock()
defer c.constraints.mu.Unlock()
c.constraints.RequiredFields[field] = true
}
// AddUniqueConstraint добавляет ограничение уникальности
func (c *Collection) AddUniqueConstraint(field string) {
c.constraints.mu.Lock()
defer c.constraints.mu.Unlock()
c.constraints.UniqueFields[field] = true
// Также создаём уникальный индекс
c.CreateIndex("unique_"+field, []string{field}, true)
}
// AddMinConstraint добавляет минимальное значение
func (c *Collection) AddMinConstraint(field string, min float64) {
c.constraints.mu.Lock()
defer c.constraints.mu.Unlock()
c.constraints.MinValues[field] = min
}
// AddMaxConstraint добавляет максимальное значение
func (c *Collection) AddMaxConstraint(field string, max float64) {
c.constraints.mu.Lock()
defer c.constraints.mu.Unlock()
c.constraints.MaxValues[field] = max
}
// AddPatternConstraint добавляет regexp паттерн
func (c *Collection) AddPatternConstraint(field string, pattern string) {
c.constraints.mu.Lock()
defer c.constraints.mu.Unlock()
c.constraints.PatternFields[field] = pattern
}
// AddEnumConstraint добавляет допустимые значения
func (c *Collection) AddEnumConstraint(field string, values []interface{}) {
c.constraints.mu.Lock()
defer c.constraints.mu.Unlock()
c.constraints.EnumFields[field] = values
}
// ValidateDocument проверяет документ на соответствие ограничениям
func (cons *Constraints) ValidateDocument(doc *Document) error {
cons.mu.RLock()
defer cons.mu.RUnlock()
// Проверка обязательных полей
for field := range cons.RequiredFields {
if !doc.HasField(field) {
return fmt.Errorf("required field '%s' is missing", field)
}
}
// Проверка уникальности (дополнительно к индексам)
// (основная проверка в индексах)
// Проверка числовых ограничений
for field, minVal := range cons.MinValues {
if val, err := doc.GetField(field); err == nil {
if numVal, ok := toFloat64(val); ok {
if numVal < minVal {
return fmt.Errorf("field '%s' value %v is less than minimum %v", field, numVal, minVal)
}
}
}
}
for field, maxVal := range cons.MaxValues {
if val, err := doc.GetField(field); err == nil {
if numVal, ok := toFloat64(val); ok {
if numVal > maxVal {
return fmt.Errorf("field '%s' value %v exceeds maximum %v", field, numVal, maxVal)
}
}
}
}
// Проверка enum
for field, allowedValues := range cons.EnumFields {
if val, err := doc.GetField(field); err == nil {
found := false
for _, allowed := range allowedValues {
if fmt.Sprintf("%v", val) == fmt.Sprintf("%v", allowed) {
found = true
break
}
}
if !found {
return fmt.Errorf("field '%s' value '%v' not in allowed list", field, val)
}
}
}
return nil
}
// ========== ACL Methods ==========
// SetACL устанавливает ACL для коллекции
func (c *Collection) SetACL(role string, canRead, canWrite, canDelete, isAdmin bool) {
c.acl.mu.Lock()
defer c.acl.mu.Unlock()
if canRead {
c.acl.ReadRoles[role] = true
}
if canWrite {
c.acl.WriteRoles[role] = true
}
if canDelete {
c.acl.DeleteRoles[role] = true
}
if isAdmin {
c.acl.AdminRoles[role] = true
}
}
// CheckPermission проверяет наличие разрешения у роли
func (c *Collection) CheckPermission(role, operation string) bool {
c.acl.mu.RLock()
defer c.acl.mu.RUnlock()
// Администратор имеет все права
if c.acl.AdminRoles[role] {
return true
}
switch operation {
case "read":
return c.acl.ReadRoles[role]
case "write":
return c.acl.WriteRoles[role]
case "delete":
return c.acl.DeleteRoles[role]
default:
return false
}
}
// toFloat64 конвертирует interface{} в float64
func toFloat64(val interface{}) (float64, bool) {
switch v := val.(type) {
case int:
return float64(v), true
case int64:
return float64(v), true
case float64:
return v, true
case float32:
return float64(v), true
default:
return 0, false
}
}

View File

@@ -0,0 +1,490 @@
/*
* Copyright 2026 Safronov Grigorii
*
* Licensed under the CDDL, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
* https://opensource.org/licenses/CDDL-1.0
*/
// Файл: internal/storage/document.go
// Назначение: Определение структуры документа, его методов для работы
// с полями, кортежами (вложенными документами) и сериализации в MessagePack.
// Документ является основной единицей хранения в СУБД futriis.
package storage
import (
"fmt"
"strings"
"sync"
"time"
"futriis/internal/compression"
"futriis/internal/serializer"
"github.com/google/uuid"
)
// Document представляет документ в коллекции (аналог строки в реляционной СУБД)
type Document struct {
ID string `msgpack:"_id"` // Уникальный идентификатор документа
Fields map[string]interface{} `msgpack:"fields"` // Поля документа (аналог колонок)
CreatedAt int64 `msgpack:"created_at"` // Время создания (Unix миллисекунды)
UpdatedAt int64 `msgpack:"updated_at"` // Время последнего обновления
Version uint64 `msgpack:"version"` // Версия документа (для оптимистичных блокировок)
Compressed bool `msgpack:"compressed"` // Флаг, сжат ли документ
OriginalSize int64 `msgpack:"original_size"` // Оригинальный размер до сжатия
mu sync.RWMutex `msgpack:"-"` // Блокировка для wait-free операций
}
// Tuple представляет вложенный документ (аналог кортежа в реляционной СУБД)
type Tuple struct {
Fields map[string]interface{} `msgpack:"fields"`
mu sync.RWMutex
}
// Field представляет отдельное поле документа (аналог колонки)
type Field struct {
Name string `msgpack:"name"`
Type FieldType `msgpack:"type"`
Value interface{} `msgpack:"value"`
}
// FieldType определяет тип поля документа
type FieldType int
const (
TypeString FieldType = iota
TypeNumber
TypeBoolean
TypeTuple // Вложенный документ
TypeArray
TypeNull
)
// NewDocument создаёт новый документ с автоматической генерацией ID
func NewDocument() *Document {
now := time.Now().UnixMilli()
return &Document{
ID: uuid.New().String(),
Fields: make(map[string]interface{}),
CreatedAt: now,
UpdatedAt: now,
Version: 1,
Compressed: false,
OriginalSize: 0,
}
}
// NewDocumentWithID создаёт документ с указанным ID
func NewDocumentWithID(id string) *Document {
now := time.Now().UnixMilli()
return &Document{
ID: id,
Fields: make(map[string]interface{}),
CreatedAt: now,
UpdatedAt: now,
Version: 1,
Compressed: false,
OriginalSize: 0,
}
}
// SetField устанавливает значение поля документа (wait-free)
func (d *Document) SetField(name string, value interface{}) {
d.mu.Lock()
defer d.mu.Unlock()
d.Fields[name] = value
d.UpdatedAt = time.Now().UnixMilli()
d.Version++
d.Compressed = false // При изменении документа снимаем флаг сжатия
}
// GetField возвращает значение поля документа
func (d *Document) GetField(name string) (interface{}, error) {
d.mu.RLock()
defer d.mu.RUnlock()
if val, ok := d.Fields[name]; ok {
return val, nil
}
return nil, fmt.Errorf("field not found: %s", name)
}
// DeleteField удаляет поле из документа
func (d *Document) DeleteField(name string) {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.Fields, name)
d.UpdatedAt = time.Now().UnixMilli()
d.Version++
d.Compressed = false
}
// HasField проверяет наличие поля в документе
func (d *Document) HasField(name string) bool {
d.mu.RLock()
defer d.mu.RUnlock()
_, ok := d.Fields[name]
return ok
}
// GetFields возвращает копию всех полей документа
func (d *Document) GetFields() map[string]interface{} {
d.mu.RLock()
defer d.mu.RUnlock()
copy := make(map[string]interface{})
for k, v := range d.Fields {
copy[k] = v
}
return copy
}
// SetTuple устанавливает вложенный документ (кортеж) в поле
func (d *Document) SetTuple(fieldName string, tuple *Tuple) {
d.SetField(fieldName, tuple)
}
// GetTuple возвращает вложенный документ из поля
func (d *Document) GetTuple(fieldName string) (*Tuple, error) {
val, err := d.GetField(fieldName)
if err != nil {
return nil, err
}
if tuple, ok := val.(*Tuple); ok {
return tuple, nil
}
return nil, fmt.Errorf("field %s is not a tuple", fieldName)
}
// Serialize сериализует документ в MessagePack с поддержкой сжатия
func (d *Document) Serialize() ([]byte, error) {
d.mu.RLock()
defer d.mu.RUnlock()
data, err := serializer.Marshal(d)
if err != nil {
return nil, err
}
return data, nil
}
// SerializeCompressed сериализует и сжимает документ
func (d *Document) SerializeCompressed(compressionConfig *compression.Config) ([]byte, error) {
d.mu.RLock()
defer d.mu.RUnlock()
// Сериализуем документ
data, err := serializer.Marshal(d)
if err != nil {
return nil, err
}
// Проверяем, нужно ли сжимать
if compressionConfig != nil && compressionConfig.Enabled && len(data) >= compressionConfig.MinSize {
compressed, err := compression.Compress(data, compressionConfig)
if err != nil {
// При ошибке сжатия возвращаем несжатые данные
return data, nil
}
return compressed, nil
}
return data, nil
}
// Deserialize десериализует документ из MessagePack (автоматически определяет сжатие)
func (d *Document) Deserialize(data []byte) error {
d.mu.Lock()
defer d.mu.Unlock()
// Пытаемся определить, сжаты ли данные
// Для этого пробуем распаковать, если не получается - данные несжатые
decompressed, err := compression.DecompressAuto(data)
if err == nil && len(decompressed) < len(data) {
// Данные были сжаты, используем распакованную версию
if err := serializer.Unmarshal(decompressed, d); err != nil {
return err
}
d.Compressed = true
d.OriginalSize = int64(len(decompressed))
} else {
// Данные не сжаты или не удалось распаковать
if err := serializer.Unmarshal(data, d); err != nil {
return err
}
d.Compressed = false
d.OriginalSize = 0
}
// Обновляем временные метки при десериализации
d.UpdatedAt = time.Now().UnixMilli()
return nil
}
// Clone создаёт глубокую копию документа
func (d *Document) Clone() *Document {
d.mu.RLock()
defer d.mu.RUnlock()
clone := &Document{
ID: d.ID,
Fields: make(map[string]interface{}),
CreatedAt: d.CreatedAt,
UpdatedAt: d.UpdatedAt,
Version: d.Version,
Compressed: d.Compressed,
OriginalSize: d.OriginalSize,
}
// Глубокое копирование полей
for k, v := range d.Fields {
clone.Fields[k] = deepCopyValue(v)
}
return clone
}
// Update применяет обновление к документу (атомарно)
func (d *Document) Update(updates map[string]interface{}) error {
d.mu.Lock()
defer d.mu.Unlock()
for k, v := range updates {
d.Fields[k] = v
}
d.UpdatedAt = time.Now().UnixMilli()
d.Version++
d.Compressed = false // После обновления документ больше не сжат
return nil
}
// Compress сжимает документ в памяти
func (d *Document) Compress(config *compression.Config) error {
d.mu.Lock()
defer d.mu.Unlock()
if d.Compressed {
return nil
}
// Сохраняем текущее состояние
originalSize := len(d.Fields)
if originalSize < config.MinSize {
return nil // Не сжимаем маленькие документы
}
d.Compressed = true
d.OriginalSize = int64(originalSize)
return nil
}
// Decompress распаковывает документ в памяти
func (d *Document) Decompress() error {
d.mu.Lock()
defer d.mu.Unlock()
if !d.Compressed {
return nil
}
d.Compressed = false
d.OriginalSize = 0
return nil
}
// GetCompressionRatio возвращает коэффициент сжатия
func (d *Document) GetCompressionRatio() float64 {
d.mu.RLock()
defer d.mu.RUnlock()
if !d.Compressed || d.OriginalSize == 0 {
return 1.0
}
currentSize := len(d.Fields)
return float64(currentSize) / float64(d.OriginalSize)
}
// deepCopyValue выполняет глубокое копирование значения
func deepCopyValue(val interface{}) interface{} {
switch v := val.(type) {
case *Tuple:
return v.Clone()
case map[string]interface{}:
copy := make(map[string]interface{})
for k, val := range v {
copy[k] = deepCopyValue(val)
}
return copy
case []interface{}:
copy := make([]interface{}, len(v))
for i, val := range v {
copy[i] = deepCopyValue(val)
}
return copy
default:
return v
}
}
// NewTuple создаёт новый вложенный документ (кортеж)
func NewTuple() *Tuple {
return &Tuple{
Fields: make(map[string]interface{}),
}
}
// Set устанавливает поле во вложенном документе
func (t *Tuple) Set(name string, value interface{}) {
t.mu.Lock()
defer t.mu.Unlock()
t.Fields[name] = value
}
// Get возвращает поле из вложенного документа
func (t *Tuple) Get(name string) (interface{}, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if val, ok := t.Fields[name]; ok {
return val, nil
}
return nil, fmt.Errorf("tuple field not found: %s", name)
}
// Clone создаёт копию кортежа
func (t *Tuple) Clone() *Tuple {
t.mu.RLock()
defer t.mu.RUnlock()
clone := NewTuple()
for k, v := range t.Fields {
clone.Fields[k] = deepCopyValue(v)
}
return clone
}
// ToMap конвертирует кортеж в map
func (t *Tuple) ToMap() map[string]interface{} {
t.mu.RLock()
defer t.mu.RUnlock()
copy := make(map[string]interface{})
for k, v := range t.Fields {
copy[k] = v
}
return copy
}
// GetNestedField получает значение по точечному пути (например, "user.address.city")
func (d *Document) GetNestedField(path string) (interface{}, error) {
parts := strings.Split(path, ".")
if len(parts) == 0 {
return nil, fmt.Errorf("empty path")
}
current := interface{}(d)
for _, part := range parts {
switch v := current.(type) {
case *Document:
val, err := v.GetField(part)
if err != nil {
return nil, err
}
current = val
case *Tuple:
val, err := v.Get(part)
if err != nil {
return nil, err
}
current = val
case map[string]interface{}:
if val, ok := v[part]; ok {
current = val
} else {
return nil, fmt.Errorf("field not found: %s", part)
}
default:
return nil, fmt.Errorf("cannot navigate into non-document value at %s", part)
}
}
return current, nil
}
// SetNestedField устанавливает значение по точечному пути
func (d *Document) SetNestedField(path string, value interface{}) error {
parts := strings.Split(path, ".")
if len(parts) == 0 {
return fmt.Errorf("empty path")
}
// Если путь состоит из одного элемента, просто устанавливаем поле
if len(parts) == 1 {
d.SetField(parts[0], value)
return nil
}
// Иначе нужно создать промежуточные структуры
current := interface{}(d)
for i := 0; i < len(parts)-1; i++ {
part := parts[i]
switch v := current.(type) {
case *Document:
if !v.HasField(part) {
// Создаём новый кортеж, если поле не существует
newTuple := NewTuple()
v.SetField(part, newTuple)
current = newTuple
} else {
field, _ := v.GetField(part)
if tuple, ok := field.(*Tuple); ok {
current = tuple
} else {
return fmt.Errorf("field %s is not a tuple", part)
}
}
case *Tuple:
if val, err := v.Get(part); err == nil {
if tuple, ok := val.(*Tuple); ok {
current = tuple
} else {
return fmt.Errorf("field %s is not a tuple", part)
}
} else {
newTuple := NewTuple()
v.Set(part, newTuple)
current = newTuple
}
default:
return fmt.Errorf("cannot set nested field on non-document value")
}
}
// Устанавливаем значение в последний элемент пути
lastPart := parts[len(parts)-1]
switch v := current.(type) {
case *Document:
v.SetField(lastPart, value)
case *Tuple:
v.Set(lastPart, value)
default:
return fmt.Errorf("cannot set field on non-document value")
}
d.UpdatedAt = time.Now().UnixMilli()
d.Compressed = false
return nil
}

234
internal/storage/engine.go Normal file
View File

@@ -0,0 +1,234 @@
/*
* Copyright 2026 Safronov Grigorii
*
* Licensed under the CDDL, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
* https://opensource.org/licenses/CDDL-1.0
*/
// Файл: internal/storage/engine.go
// Назначение: In-memory движок хранения документов с поддержкой коллекций,
// слайсов (аналог БД), тапплов (аналог таблиц), полей и кортежей.
// Полностью wait-free с использованием sync.Map и атомарных операций.
package storage
import (
"fmt"
"sync"
"sync/atomic"
"futriis/internal/log"
"futriis/internal/serializer"
)
// Storage представляет основное хранилище баз данных
type Storage struct {
databases sync.Map // map[string]*Database
pageSize int64
logger *log.Logger
totalDocs atomic.Int64
}
// Database представляет базу данных (аналог слайса в реляционных СУБД)
type Database struct {
name string
collections sync.Map // map[string]*Collection
}
// NewStorage создаёт новый экземпляр хранилища
func NewStorage(pageSizeMB int, logger *log.Logger) *Storage {
return &Storage{
pageSize: int64(pageSizeMB) * 1024 * 1024,
logger: logger,
}
}
// CreateDatabase создаёт новую базу данных
func (s *Storage) CreateDatabase(name string) error {
if _, exists := s.databases.LoadOrStore(name, &Database{name: name}); exists {
return fmt.Errorf("database already exists")
}
AuditDatabaseOperation("CREATE", name)
s.logger.Info("Database created: " + name)
return nil
}
// GetDatabase возвращает базу данных по имени
func (s *Storage) GetDatabase(name string) (*Database, error) {
if val, ok := s.databases.Load(name); ok {
return val.(*Database), nil
}
return nil, fmt.Errorf("database not found")
}
// DropDatabase удаляет базу данных
func (s *Storage) DropDatabase(name string) error {
if _, ok := s.databases.LoadAndDelete(name); !ok {
return fmt.Errorf("database not found")
}
AuditDatabaseOperation("DROP", name)
s.logger.Info("Database dropped: " + name)
return nil
}
// ListDatabases возвращает список всех баз данных
func (s *Storage) ListDatabases() []string {
databases := make([]string, 0)
s.databases.Range(func(key, value interface{}) bool {
databases = append(databases, key.(string))
return true
})
return databases
}
// Name возвращает имя базы данных
func (db *Database) Name() string {
return db.name
}
// CreateCollection создаёт новую коллекцию в базе данных
func (db *Database) CreateCollection(name string) error {
if _, exists := db.collections.LoadOrStore(name, NewCollection(name, nil)); exists {
return fmt.Errorf("collection already exists")
}
AuditCollectionOperation("CREATE", db.name, name, nil)
return nil
}
// CreateCollectionWithSettings создаёт коллекцию с настройками
func (db *Database) CreateCollectionWithSettings(name string, settings *CollectionSettings) error {
if _, exists := db.collections.LoadOrStore(name, NewCollection(name, settings)); exists {
return fmt.Errorf("collection already exists")
}
AuditCollectionOperation("CREATE", db.name, name, settings)
return nil
}
// GetCollection возвращает коллекцию по имени
func (db *Database) GetCollection(name string) (*Collection, error) {
if val, ok := db.collections.Load(name); ok {
return val.(*Collection), nil
}
return nil, fmt.Errorf("collection not found")
}
// DropCollection удаляет коллекцию
func (db *Database) DropCollection(name string) error {
if _, ok := db.collections.LoadAndDelete(name); !ok {
return fmt.Errorf("collection not found")
}
AuditCollectionOperation("DROP", db.name, name, nil)
return nil
}
// ListCollections возвращает список всех коллекций в базе данных
func (db *Database) ListCollections() []string {
collections := make([]string, 0)
db.collections.Range(func(key, value interface{}) bool {
collections = append(collections, key.(string))
return true
})
return collections
}
// GetTotalDocuments возвращает общее количество документов во всех коллекциях
func (s *Storage) GetTotalDocuments() int64 {
return s.totalDocs.Load()
}
// GetPageSize возвращает размер страницы памяти
func (s *Storage) GetPageSize() int64 {
return s.pageSize
}
// SerializeDatabase сериализует всю базу данных в MessagePack
func (db *Database) SerializeDatabase() ([]byte, error) {
dbData := make(map[string]interface{})
db.collections.Range(func(key, value interface{}) bool {
coll := value.(*Collection)
collData := make(map[string]interface{})
// Собираем все документы коллекции
docs := coll.GetAllDocuments()
collDocs := make([]*Document, 0, len(docs))
for _, doc := range docs {
collDocs = append(collDocs, doc)
}
collData["documents"] = collDocs
collData["metadata"] = coll.GetMetadata()
dbData[key.(string)] = collData
return true
})
return serializer.Marshal(dbData)
}
// DeserializeDatabase десериализует базу данных из MessagePack
func (db *Database) DeserializeDatabase(data []byte) error {
var dbData map[string]interface{}
if err := serializer.Unmarshal(data, &dbData); err != nil {
return err
}
for collName, collDataRaw := range dbData {
collData := collDataRaw.(map[string]interface{})
// Создаём коллекцию
settings := &CollectionSettings{
MaxDocuments: 0,
ValidateSchema: false,
AutoIndexID: true,
TTLSeconds: 0,
}
if metaRaw, ok := collData["metadata"]; ok {
if meta, ok := metaRaw.(*CollectionMetadata); ok {
if meta.Settings != nil {
settings = meta.Settings
}
}
}
coll := NewCollection(collName, settings)
// Восстанавливаем документы
if docsRaw, ok := collData["documents"]; ok {
if docs, ok := docsRaw.([]*Document); ok {
for _, doc := range docs {
coll.Insert(doc)
}
}
}
db.collections.Store(collName, coll)
AuditCollectionOperation("RESTORE", db.name, collName, settings)
}
return nil
}
// GetDatabaseNames возвращает имена всех баз данных
func (s *Storage) GetDatabaseNames() []string {
return s.ListDatabases()
}
// ExistsDatabase проверяет существование базы данных
func (s *Storage) ExistsDatabase(name string) bool {
_, ok := s.databases.Load(name)
return ok
}
// GetDatabaseCount возвращает количество баз данных
func (s *Storage) GetDatabaseCount() int {
count := 0
s.databases.Range(func(key, value interface{}) bool {
count++
return true
})
return count
}

View File

@@ -0,0 +1,392 @@
/*
* Copyright 2026 Safronov Grigorii
*
* Licensed under the CDDL, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
* https://opensource.org/licenses/CDDL-1.0
*/
// Файл: internal/storage/transaction.go
// Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control)
// и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование.
package storage
import (
"encoding/binary"
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"futriis/internal/serializer"
)
// TransactionID представляет уникальный идентификатор транзакции
type TransactionID uint64
// TransactionState представляет состояние транзакции
type TransactionState int32
const (
TransactionActive TransactionState = iota
TransactionCommitted
TransactionAborted
)
// TransactionRecord представляет запись в WAL
type TransactionRecord struct {
ID TransactionID `msgpack:"id"`
State TransactionState `msgpack:"state"`
Timestamp int64 `msgpack:"timestamp"`
Operations []Operation `msgpack:"operations"`
}
// Operation представляет одну операцию в транзакции
type Operation struct {
Type string `msgpack:"type"` // "insert", "update", "delete"
Database string `msgpack:"database"`
Collection string `msgpack:"collection"`
DocumentID string `msgpack:"document_id"`
Data map[string]interface{} `msgpack:"data"`
Version uint64 `msgpack:"version"`
}
// DocumentVersion представляет версию документа для MVCC
type DocumentVersion struct {
Document *Document `msgpack:"document"`
Timestamp int64 `msgpack:"timestamp"`
TxID TransactionID `msgpack:"tx_id"`
}
// TransactionManager управляет транзакциями
type TransactionManager struct {
activeTransactions sync.Map // map[TransactionID]*Transaction
nextTxID atomic.Uint64
wal *WriteAheadLog
mu sync.RWMutex
}
// Transaction представляет одну транзакцию
type Transaction struct {
ID TransactionID
State atomic.Int32
Operations []Operation
StartTime int64
mu sync.RWMutex
}
// WriteAheadLog реализует журнал предзаписи
type WriteAheadLog struct {
file *os.File
writeChan chan []byte
done chan struct{}
mu sync.RWMutex
}
var (
globalTxManager *TransactionManager
txManagerOnce sync.Once
currentTx atomic.Value // *Transaction
)
// InitTransactionManager инициализирует глобальный менеджер транзакций
func InitTransactionManager(walPath string) error {
var err error
txManagerOnce.Do(func() {
globalTxManager = &TransactionManager{
nextTxID: atomic.Uint64{},
}
globalTxManager.nextTxID.Store(1)
err = globalTxManager.initWAL(walPath)
})
return err
}
// initWAL инициализирует Write-Ahead Log
func (tm *TransactionManager) initWAL(walPath string) error {
wal, err := NewWriteAheadLog(walPath)
if err != nil {
return err
}
tm.wal = wal
// Восстанавливаем состояние из WAL при запуске
go tm.recoverFromWAL()
return nil
}
// NewWriteAheadLog создаёт новый WAL
func NewWriteAheadLog(path string) (*WriteAheadLog, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
wal := &WriteAheadLog{
file: file,
writeChan: make(chan []byte, 10000),
done: make(chan struct{}),
}
go wal.writerLoop()
return wal, nil
}
// writerLoop асинхронно записывает данные в WAL
func (wal *WriteAheadLog) writerLoop() {
for data := range wal.writeChan {
wal.mu.Lock()
// Формат записи: [длина (4 байта)][данные]
lenBuf := make([]byte, 4)
binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
if _, err := wal.file.Write(lenBuf); err != nil {
continue
}
if _, err := wal.file.Write(data); err != nil {
continue
}
wal.file.Sync()
wal.mu.Unlock()
}
close(wal.done)
}
// Write записывает запись в WAL
func (wal *WriteAheadLog) Write(record *TransactionRecord) error {
data, err := serializer.Marshal(record)
if err != nil {
return err
}
select {
case wal.writeChan <- data:
return nil
default:
return fmt.Errorf("WAL buffer full")
}
}
// Close закрывает WAL
func (wal *WriteAheadLog) Close() error {
close(wal.writeChan)
<-wal.done
return wal.file.Close()
}
// BeginTransaction начинает новую транзакцию
func BeginTransaction() *Transaction {
if globalTxManager == nil {
InitTransactionManager("futriis.wal")
}
tx := &Transaction{
ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1),
StartTime: time.Now().UnixMilli(),
Operations: make([]Operation, 0),
}
tx.State.Store(int32(TransactionActive))
globalTxManager.activeTransactions.Store(tx.ID, tx)
// Сохраняем как текущую транзакцию для горутины
currentTx.Store(tx)
// Записываем начало транзакции в WAL
record := &TransactionRecord{
ID: tx.ID,
State: TransactionActive,
Timestamp: tx.StartTime,
Operations: tx.Operations,
}
globalTxManager.wal.Write(record)
return tx
}
// AddToTransaction добавляет операцию в текущую транзакцию
func AddToTransaction(coll *Collection, opType string, doc *Document) error {
txVal := currentTx.Load()
if txVal == nil {
return fmt.Errorf("no active transaction")
}
tx := txVal.(*Transaction)
if TransactionState(tx.State.Load()) != TransactionActive {
return fmt.Errorf("transaction is not active")
}
op := Operation{
Type: opType,
Database: coll.Name(), // В реальной реализации нужно передавать имя БД
Collection: coll.Name(),
DocumentID: doc.ID,
Data: doc.GetFields(),
Version: doc.Version,
}
tx.mu.Lock()
tx.Operations = append(tx.Operations, op)
tx.mu.Unlock()
return nil
}
// CommitCurrentTransaction коммитит текущую транзакцию
func CommitCurrentTransaction() error {
txVal := currentTx.Load()
if txVal == nil {
return fmt.Errorf("no active transaction")
}
tx := txVal.(*Transaction)
if TransactionState(tx.State.Load()) != TransactionActive {
return fmt.Errorf("transaction is not active")
}
// Применяем все операции атомарно
for _, op := range tx.Operations {
if err := applyOperation(op); err != nil {
// Откатываем при ошибке
AbortCurrentTransaction()
return fmt.Errorf("transaction commit failed: %v", err)
}
}
tx.State.Store(int32(TransactionCommitted))
// Записываем коммит в WAL
record := &TransactionRecord{
ID: tx.ID,
State: TransactionCommitted,
Timestamp: time.Now().UnixMilli(),
Operations: tx.Operations,
}
globalTxManager.wal.Write(record)
// Очищаем текущую транзакцию
currentTx.Store(nil)
globalTxManager.activeTransactions.Delete(tx.ID)
return nil
}
// AbortCurrentTransaction откатывает текущую транзакцию
func AbortCurrentTransaction() error {
txVal := currentTx.Load()
if txVal == nil {
return fmt.Errorf("no active transaction")
}
tx := txVal.(*Transaction)
tx.State.Store(int32(TransactionAborted))
// Записываем откат в WAL
record := &TransactionRecord{
ID: tx.ID,
State: TransactionAborted,
Timestamp: time.Now().UnixMilli(),
Operations: tx.Operations,
}
globalTxManager.wal.Write(record)
// Очищаем текущую транзакцию
currentTx.Store(nil)
globalTxManager.activeTransactions.Delete(tx.ID)
return nil
}
// HasActiveTransaction проверяет наличие активной транзакции
func HasActiveTransaction() bool {
return currentTx.Load() != nil
}
// FindInTransaction ищет документ в контексте транзакции
func FindInTransaction(coll *Collection, id string) (*Document, error) {
txVal := currentTx.Load()
if txVal == nil {
return coll.Find(id)
}
tx := txVal.(*Transaction)
// Сначала ищем в операциях транзакции
for i := len(tx.Operations) - 1; i >= 0; i-- {
op := tx.Operations[i]
if op.DocumentID == id {
if op.Type == "delete" {
return nil, fmt.Errorf("key not found")
}
// Создаём документ из данных операции
doc := NewDocumentWithID(op.DocumentID)
for k, v := range op.Data {
doc.SetField(k, v)
}
doc.Version = op.Version
return doc, nil
}
}
// Ищем в основном хранилище
return coll.Find(id)
}
// applyOperation применяет операцию к хранилищу
func applyOperation(op Operation) error {
// В реальной реализации здесь будет применение операции к соответствующей коллекции
// С использованием MVCC для версионирования
switch op.Type {
case "insert":
// Проверяем версию документа (MVCC)
doc := NewDocumentWithID(op.DocumentID)
for k, v := range op.Data {
doc.SetField(k, v)
}
// Здесь должна быть вставка в коллекцию
return nil
case "update":
// Обновление с проверкой версии
return nil
case "delete":
// Удаление
return nil
}
return nil
}
// recoverFromWAL восстанавливает состояние из WAL после сбоя
func (tm *TransactionManager) recoverFromWAL() {
// В реальной реализации здесь будет чтение WAL и восстановление
// незавершённых транзакций
}
// GetTransaction возвращает транзакцию по ID
func GetTransaction(id TransactionID) (*Transaction, bool) {
if val, ok := globalTxManager.activeTransactions.Load(id); ok {
return val.(*Transaction), true
}
return nil, false
}
// MVCCSnapshot создаёт снапшот текущего состояния для MVCC
func MVCCSnapshot() uint64 {
return uint64(time.Now().UnixNano())
}
// CreateDocumentVersion создаёт новую версию документа для MVCC
func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion {
return &DocumentVersion{
Document: doc.Clone(),
Timestamp: time.Now().UnixMilli(),
TxID: txID,
}
}

681
internal/storage/trigger.go Normal file
View File

@@ -0,0 +1,681 @@
/*
* Copyright 2026 Safronov Grigorii
*
* Licensed under the CDDL, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
* https://opensource.org/licenses/CDDL-1.0
*/
// Файл: internal/storage/trigger.go
// Назначение: Реализация триггеров, похожих на MongoDB trigger syntax.
// Поддерживает события: INSERT, UPDATE, DELETE, REPLACE.
// Триггеры могут выполняться до или после события.
package storage
import (
"fmt"
"regexp"
"strings"
"sync"
"time"
"futriis/internal/log"
)
// TriggerEvent определяет тип события для триггера
type TriggerEvent string
const (
TriggerBeforeInsert TriggerEvent = "BEFORE_INSERT"
TriggerAfterInsert TriggerEvent = "AFTER_INSERT"
TriggerBeforeUpdate TriggerEvent = "BEFORE_UPDATE"
TriggerAfterUpdate TriggerEvent = "AFTER_UPDATE"
TriggerBeforeDelete TriggerEvent = "BEFORE_DELETE"
TriggerAfterDelete TriggerEvent = "AFTER_DELETE"
TriggerBeforeReplace TriggerEvent = "BEFORE_REPLACE"
TriggerAfterReplace TriggerEvent = "AFTER_REPLACE"
)
// TriggerAction определяет действие триггера
type TriggerAction string
const (
ActionAbort TriggerAction = "abort" // Прервать операцию
ActionSkip TriggerAction = "skip" // Пропустить операцию
ActionModify TriggerAction = "modify" // Модифицировать документ
ActionLog TriggerAction = "log" // Записать в лог
ActionNotify TriggerAction = "notify" // Отправить уведомление
ActionCustom TriggerAction = "custom" // Пользовательское действие
)
// Trigger представляет триггер на коллекции
type Trigger struct {
Name string `msgpack:"name"`
Collection string `msgpack:"collection"`
Event TriggerEvent `msgpack:"event"`
Action TriggerAction `msgpack:"action"`
Condition *TriggerCondition `msgpack:"condition"`
Operations []TriggerOperation `msgpack:"operations"`
CreatedAt int64 `msgpack:"created_at"`
UpdatedAt int64 `msgpack:"updated_at"`
Enabled bool `msgpack:"enabled"`
Description string `msgpack:"description"`
mu sync.RWMutex `msgpack:"-"`
}
// TriggerCondition определяет условие выполнения триггера
type TriggerCondition struct {
Field string `msgpack:"field"` // Поле для проверки
Operator string `msgpack:"operator"` // Оператор: eq, ne, gt, lt, gte, lte, in, nin, exists, regex
Value interface{} `msgpack:"value"` // Значение для сравнения
Match string `msgpack:"match"` // Паттерн для regex
}
// TriggerOperation определяет операцию, выполняемую триггером
type TriggerOperation struct {
Type string `msgpack:"type"` // set, unset, inc, mul, rename, currentDate
Field string `msgpack:"field"` // Поле для операции
Value interface{} `msgpack:"value"` // Значение для операции
Params map[string]interface{} `msgpack:"params"`
}
// TriggerExecution содержит контекст выполнения триггера
type TriggerExecution struct {
TriggerName string
Event TriggerEvent
Collection string
Database string
DocumentID string
OldDocument *Document
NewDocument *Document
Operation string
Timestamp time.Time
User string
Role string
CustomData map[string]interface{}
}
// TriggerManager управляет триггерами в СУБД
type TriggerManager struct {
triggers sync.Map // map[string]*Trigger (ключ: collection|event|name)
logger *log.Logger
mu sync.RWMutex
auditLog []*TriggerExecution
maxLogSize int
}
var (
globalTriggerManager *TriggerManager
triggerManagerOnce sync.Once
)
// GetTriggerManager возвращает глобальный менеджер триггеров
func GetTriggerManager() *TriggerManager {
triggerManagerOnce.Do(func() {
globalTriggerManager = &TriggerManager{
maxLogSize: 10000,
auditLog: make([]*TriggerExecution, 0),
}
})
return globalTriggerManager
}
// InitTriggerManager инициализирует менеджер триггеров с логгером
func InitTriggerManager(logger *log.Logger) {
tm := GetTriggerManager()
tm.logger = logger
if logger != nil {
logger.Info("Trigger manager initialized")
}
}
// CreateTrigger создаёт новый триггер (синтаксис MongoDB-like)
// Пример: db.collection.createTrigger("triggerName", "BEFORE_INSERT", {
// condition: { field: "status", operator: "eq", value: "active" },
// action: "modify",
// operations: [
// { type: "set", field: "updated_at", value: "$$NOW" }
// ]
// })
func (tm *TriggerManager) CreateTrigger(database, collection, name string, event TriggerEvent, config map[string]interface{}) error {
tm.mu.Lock()
defer tm.mu.Unlock()
key := tm.getTriggerKey(collection, event, name)
if _, exists := tm.triggers.Load(key); exists {
return fmt.Errorf("trigger '%s' already exists on %s for event %s", name, collection, event)
}
trigger := &Trigger{
Name: name,
Collection: collection,
Event: event,
Enabled: true,
CreatedAt: time.Now().UnixMilli(),
UpdatedAt: time.Now().UnixMilli(),
Operations: make([]TriggerOperation, 0),
}
// Парсим конфигурацию триггера
if action, ok := config["action"].(string); ok {
switch strings.ToLower(action) {
case "abort":
trigger.Action = ActionAbort
case "skip":
trigger.Action = ActionSkip
case "modify":
trigger.Action = ActionModify
case "log":
trigger.Action = ActionLog
case "notify":
trigger.Action = ActionNotify
default:
trigger.Action = ActionCustom
}
}
// Парсим условие
if cond, ok := config["condition"].(map[string]interface{}); ok {
trigger.Condition = &TriggerCondition{}
if field, ok := cond["field"].(string); ok {
trigger.Condition.Field = field
}
if operator, ok := cond["operator"].(string); ok {
trigger.Condition.Operator = operator
}
if value, ok := cond["value"]; ok {
trigger.Condition.Value = value
}
if match, ok := cond["match"].(string); ok {
trigger.Condition.Match = match
}
}
// Парсим операции
if ops, ok := config["operations"].([]interface{}); ok {
for _, opRaw := range ops {
opMap, ok := opRaw.(map[string]interface{})
if !ok {
continue
}
operation := TriggerOperation{}
if opType, ok := opMap["type"].(string); ok {
operation.Type = opType
}
if field, ok := opMap["field"].(string); ok {
operation.Field = field
}
if value, ok := opMap["value"]; ok {
operation.Value = value
}
if params, ok := opMap["params"].(map[string]interface{}); ok {
operation.Params = params
}
trigger.Operations = append(trigger.Operations, operation)
}
}
if desc, ok := config["description"].(string); ok {
trigger.Description = desc
}
tm.triggers.Store(key, trigger)
if tm.logger != nil {
tm.logger.Info(fmt.Sprintf("Trigger '%s' created on %s.%s for event %s", name, database, collection, event))
}
return nil
}
// DropTrigger удаляет триггер
func (tm *TriggerManager) DropTrigger(collection, event, name string) error {
key := tm.getTriggerKey(collection, TriggerEvent(event), name)
if _, exists := tm.triggers.LoadAndDelete(key); !exists {
return fmt.Errorf("trigger '%s' not found on %s for event %s", name, collection, event)
}
if tm.logger != nil {
tm.logger.Info(fmt.Sprintf("Trigger '%s' dropped from %s for event %s", name, collection, event))
}
return nil
}
// GetTrigger возвращает триггер по имени
func (tm *TriggerManager) GetTrigger(collection, event, name string) (*Trigger, error) {
key := tm.getTriggerKey(collection, TriggerEvent(event), name)
if val, ok := tm.triggers.Load(key); ok {
return val.(*Trigger), nil
}
return nil, fmt.Errorf("trigger not found: %s", name)
}
// ListTriggers возвращает список всех триггеров для коллекции
func (tm *TriggerManager) ListTriggers(collection string) []*Trigger {
triggers := make([]*Trigger, 0)
tm.triggers.Range(func(key, value interface{}) bool {
trigger := value.(*Trigger)
if collection == "" || trigger.Collection == collection {
triggers = append(triggers, trigger)
}
return true
})
return triggers
}
// ListTriggersByEvent возвращает триггеры для конкретного события
func (tm *TriggerManager) ListTriggersByEvent(collection string, event TriggerEvent) []*Trigger {
triggers := make([]*Trigger, 0)
tm.triggers.Range(func(key, value interface{}) bool {
trigger := value.(*Trigger)
if trigger.Collection == collection && trigger.Event == event && trigger.Enabled {
triggers = append(triggers, trigger)
}
return true
})
return triggers
}
// EnableTrigger включает триггер
func (tm *TriggerManager) EnableTrigger(collection, event, name string) error {
key := tm.getTriggerKey(collection, TriggerEvent(event), name)
val, ok := tm.triggers.Load(key)
if !ok {
return fmt.Errorf("trigger not found: %s", name)
}
trigger := val.(*Trigger)
trigger.mu.Lock()
trigger.Enabled = true
trigger.UpdatedAt = time.Now().UnixMilli()
trigger.mu.Unlock()
tm.triggers.Store(key, trigger)
return nil
}
// DisableTrigger выключает триггер
func (tm *TriggerManager) DisableTrigger(collection, event, name string) error {
key := tm.getTriggerKey(collection, TriggerEvent(event), name)
val, ok := tm.triggers.Load(key)
if !ok {
return fmt.Errorf("trigger not found: %s", name)
}
trigger := val.(*Trigger)
trigger.mu.Lock()
trigger.Enabled = false
trigger.UpdatedAt = time.Now().UnixMilli()
trigger.mu.Unlock()
tm.triggers.Store(key, trigger)
return nil
}
// ExecuteTriggers выполняет все триггеры для данного события
// Возвращает: modifiedDocument, shouldAbort, error
func (tm *TriggerManager) ExecuteTriggers(execCtx *TriggerExecution) (*Document, bool, error) {
triggers := tm.ListTriggersByEvent(execCtx.Collection, execCtx.Event)
if len(triggers) == 0 {
return execCtx.NewDocument, false, nil
}
currentDoc := execCtx.NewDocument
if currentDoc == nil && execCtx.OldDocument != nil {
currentDoc = execCtx.OldDocument.Clone()
}
for _, trigger := range triggers {
if !trigger.Enabled {
continue
}
// Проверяем условие
if trigger.Condition != nil {
if !tm.evaluateCondition(execCtx, trigger.Condition) {
continue
}
}
// Выполняем действие триггера
switch trigger.Action {
case ActionAbort:
tm.logExecution(execCtx, trigger, "aborted")
return currentDoc, true, fmt.Errorf("operation aborted by trigger: %s", trigger.Name)
case ActionSkip:
tm.logExecution(execCtx, trigger, "skipped")
return currentDoc, true, nil
case ActionModify:
if currentDoc != nil {
currentDoc = tm.applyOperations(currentDoc, trigger.Operations, execCtx)
}
tm.logExecution(execCtx, trigger, "modified")
case ActionLog:
tm.logExecution(execCtx, trigger, "logged")
if tm.logger != nil {
tm.logger.Info(fmt.Sprintf("Trigger %s executed on %s.%s (event: %s, doc: %s)",
trigger.Name, execCtx.Database, execCtx.Collection, execCtx.Event, execCtx.DocumentID))
}
case ActionNotify:
tm.logExecution(execCtx, trigger, "notified")
// Здесь можно отправить уведомление через WebSocket или другой канал
}
}
return currentDoc, false, nil
}
// evaluateCondition проверяет условие триггера
func (tm *TriggerManager) evaluateCondition(execCtx *TriggerExecution, cond *TriggerCondition) bool {
var docToCheck *Document
if execCtx.NewDocument != nil {
docToCheck = execCtx.NewDocument
} else if execCtx.OldDocument != nil {
docToCheck = execCtx.OldDocument
} else {
return false
}
fieldValue, err := docToCheck.GetField(cond.Field)
if err != nil {
// Поле не существует
if cond.Operator == "exists" {
if existsVal, ok := cond.Value.(bool); ok && !existsVal {
return true
}
}
return false
}
switch cond.Operator {
case "eq":
return fmt.Sprintf("%v", fieldValue) == fmt.Sprintf("%v", cond.Value)
case "ne":
return fmt.Sprintf("%v", fieldValue) != fmt.Sprintf("%v", cond.Value)
case "gt":
return compareNumbers(fieldValue, cond.Value) > 0
case "lt":
return compareNumbers(fieldValue, cond.Value) < 0
case "gte":
return compareNumbers(fieldValue, cond.Value) >= 0
case "lte":
return compareNumbers(fieldValue, cond.Value) <= 0
case "in":
if arr, ok := cond.Value.([]interface{}); ok {
for _, v := range arr {
if fmt.Sprintf("%v", fieldValue) == fmt.Sprintf("%v", v) {
return true
}
}
}
return false
case "nin":
if arr, ok := cond.Value.([]interface{}); ok {
for _, v := range arr {
if fmt.Sprintf("%v", fieldValue) == fmt.Sprintf("%v", v) {
return false
}
}
}
return true
case "exists":
if existsVal, ok := cond.Value.(bool); ok {
return existsVal
}
return true
case "regex":
if pattern, ok := cond.Value.(string); ok {
matched, _ := regexp.MatchString(pattern, fmt.Sprintf("%v", fieldValue))
return matched
}
return false
default:
return true
}
}
// applyOperations применяет операции к документу
func (tm *TriggerManager) applyOperations(doc *Document, ops []TriggerOperation, execCtx *TriggerExecution) *Document {
if doc == nil {
return nil
}
result := doc.Clone()
for _, op := range ops {
switch op.Type {
case "set":
value := tm.resolveValue(op.Value, execCtx)
result.SetField(op.Field, value)
case "unset":
result.DeleteField(op.Field)
case "inc":
if incVal, ok := toFloat64(op.Value); ok {
if current, err := result.GetField(op.Field); err == nil {
if currVal, ok := toFloat64(current); ok {
result.SetField(op.Field, currVal+incVal)
}
} else {
result.SetField(op.Field, incVal)
}
}
case "mul":
if mulVal, ok := toFloat64(op.Value); ok {
if current, err := result.GetField(op.Field); err == nil {
if currVal, ok := toFloat64(current); ok {
result.SetField(op.Field, currVal*mulVal)
}
}
}
case "rename":
if newName, ok := op.Value.(string); ok {
if val, err := result.GetField(op.Field); err == nil {
result.SetField(newName, val)
result.DeleteField(op.Field)
}
}
case "currentDate":
result.SetField(op.Field, time.Now().UnixMilli())
}
}
return result
}
// resolveValue разрешает специальные значения типа $$NOW, $$USER
func (tm *TriggerManager) resolveValue(value interface{}, execCtx *TriggerExecution) interface{} {
if strVal, ok := value.(string); ok {
switch strVal {
case "$$NOW":
return time.Now().UnixMilli()
case "$$USER":
if execCtx.User != "" {
return execCtx.User
}
return "anonymous"
case "$$ROLE":
if execCtx.Role != "" {
return execCtx.Role
}
return "anonymous"
}
}
return value
}
// logExecution логирует выполнение триггера
func (tm *TriggerManager) logExecution(execCtx *TriggerExecution, trigger *Trigger, result string) {
tm.mu.Lock()
defer tm.mu.Unlock()
execCtx.TriggerName = trigger.Name
execCtx.Timestamp = time.Now()
if len(tm.auditLog) >= tm.maxLogSize {
tm.auditLog = tm.auditLog[1:]
}
tm.auditLog = append(tm.auditLog, execCtx)
}
// GetTriggerExecutionLog возвращает лог выполнения триггеров
func (tm *TriggerManager) GetTriggerExecutionLog() []*TriggerExecution {
tm.mu.RLock()
defer tm.mu.RUnlock()
result := make([]*TriggerExecution, len(tm.auditLog))
copy(result, tm.auditLog)
return result
}
// getTriggerKey возвращает ключ для хранения триггера
func (tm *TriggerManager) getTriggerKey(collection string, event TriggerEvent, name string) string {
return fmt.Sprintf("%s|%s|%s", collection, event, name)
}
// compareNumbers сравнивает два числа
func compareNumbers(a, b interface{}) int {
aVal, aOk := toFloat64(a)
bVal, bOk := toFloat64(b)
if aOk && bOk {
if aVal < bVal {
return -1
}
if aVal > bVal {
return 1
}
return 0
}
return 0
}
// MongoDBLikeTriggerConfig создаёт конфигурацию триггера в стиле MongoDB
// Пример использования:
// config := MongoDBLikeTriggerConfig().
// On("BEFORE_INSERT").
// Condition("status", "eq", "active").
// Set("updated_at", "$$NOW").
// Build()
func MongoDBLikeTriggerConfig() *TriggerConfigBuilder {
return &TriggerConfigBuilder{
config: make(map[string]interface{}),
ops: make([]interface{}, 0),
}
}
// TriggerConfigBuilder строитель конфигурации триггера
type TriggerConfigBuilder struct {
config map[string]interface{}
ops []interface{}
}
// On устанавливает событие триггера
func (b *TriggerConfigBuilder) On(event string) *TriggerConfigBuilder {
b.config["event"] = event
return b
}
// Condition добавляет условие
func (b *TriggerConfigBuilder) Condition(field, operator string, value interface{}) *TriggerConfigBuilder {
b.config["condition"] = map[string]interface{}{
"field": field,
"operator": operator,
"value": value,
}
return b
}
// ConditionRegex добавляет regex условие
func (b *TriggerConfigBuilder) ConditionRegex(field, pattern string) *TriggerConfigBuilder {
b.config["condition"] = map[string]interface{}{
"field": field,
"operator": "regex",
"value": pattern,
}
return b
}
// Set добавляет операцию установки поля
func (b *TriggerConfigBuilder) Set(field string, value interface{}) *TriggerConfigBuilder {
b.ops = append(b.ops, map[string]interface{}{
"type": "set",
"field": field,
"value": value,
})
return b
}
// Unset добавляет операцию удаления поля
func (b *TriggerConfigBuilder) Unset(field string) *TriggerConfigBuilder {
b.ops = append(b.ops, map[string]interface{}{
"type": "unset",
"field": field,
})
return b
}
// Inc добавляет операцию инкремента
func (b *TriggerConfigBuilder) Inc(field string, value float64) *TriggerConfigBuilder {
b.ops = append(b.ops, map[string]interface{}{
"type": "inc",
"field": field,
"value": value,
})
return b
}
// Mul добавляет операцию умножения
func (b *TriggerConfigBuilder) Mul(field string, value float64) *TriggerConfigBuilder {
b.ops = append(b.ops, map[string]interface{}{
"type": "mul",
"field": field,
"value": value,
})
return b
}
// Rename добавляет операцию переименования поля
func (b *TriggerConfigBuilder) Rename(oldName, newName string) *TriggerConfigBuilder {
b.ops = append(b.ops, map[string]interface{}{
"type": "rename",
"field": oldName,
"value": newName,
})
return b
}
// CurrentDate добавляет операцию установки текущей даты
func (b *TriggerConfigBuilder) CurrentDate(field string) *TriggerConfigBuilder {
b.ops = append(b.ops, map[string]interface{}{
"type": "currentDate",
"field": field,
})
return b
}
// Action устанавливает действие триггера
func (b *TriggerConfigBuilder) Action(action string) *TriggerConfigBuilder {
b.config["action"] = action
return b
}
// Description устанавливает описание триггера
func (b *TriggerConfigBuilder) Description(desc string) *TriggerConfigBuilder {
b.config["description"] = desc
return b
}
// Build собирает конфигурацию
func (b *TriggerConfigBuilder) Build() map[string]interface{} {
b.config["operations"] = b.ops
return b.config
}