Files
futriix/internal/storage/trigger.go
2026-04-19 16:42:41 +03:00

682 lines
20 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/*
* Copyright 2026 Safronov Grigorii
*
* Licensed under the CDDL, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
* https://opensource.org/licenses/CDDL-1.0
*/
// Файл: internal/storage/trigger.go
// Назначение: Реализация триггеров, похожих на MongoDB trigger syntax.
// Поддерживает события: INSERT, UPDATE, DELETE, REPLACE.
// Триггеры могут выполняться до или после события.
package storage
import (
"fmt"
"regexp"
"strings"
"sync"
"time"
"futriis/internal/log"
)
// TriggerEvent определяет тип события для триггера
type TriggerEvent string
const (
TriggerBeforeInsert TriggerEvent = "BEFORE_INSERT"
TriggerAfterInsert TriggerEvent = "AFTER_INSERT"
TriggerBeforeUpdate TriggerEvent = "BEFORE_UPDATE"
TriggerAfterUpdate TriggerEvent = "AFTER_UPDATE"
TriggerBeforeDelete TriggerEvent = "BEFORE_DELETE"
TriggerAfterDelete TriggerEvent = "AFTER_DELETE"
TriggerBeforeReplace TriggerEvent = "BEFORE_REPLACE"
TriggerAfterReplace TriggerEvent = "AFTER_REPLACE"
)
// TriggerAction определяет действие триггера
type TriggerAction string
const (
ActionAbort TriggerAction = "abort" // Прервать операцию
ActionSkip TriggerAction = "skip" // Пропустить операцию
ActionModify TriggerAction = "modify" // Модифицировать документ
ActionLog TriggerAction = "log" // Записать в лог
ActionNotify TriggerAction = "notify" // Отправить уведомление
ActionCustom TriggerAction = "custom" // Пользовательское действие
)
// Trigger представляет триггер на коллекции
type Trigger struct {
Name string `msgpack:"name"`
Collection string `msgpack:"collection"`
Event TriggerEvent `msgpack:"event"`
Action TriggerAction `msgpack:"action"`
Condition *TriggerCondition `msgpack:"condition"`
Operations []TriggerOperation `msgpack:"operations"`
CreatedAt int64 `msgpack:"created_at"`
UpdatedAt int64 `msgpack:"updated_at"`
Enabled bool `msgpack:"enabled"`
Description string `msgpack:"description"`
mu sync.RWMutex `msgpack:"-"`
}
// TriggerCondition определяет условие выполнения триггера
type TriggerCondition struct {
Field string `msgpack:"field"` // Поле для проверки
Operator string `msgpack:"operator"` // Оператор: eq, ne, gt, lt, gte, lte, in, nin, exists, regex
Value interface{} `msgpack:"value"` // Значение для сравнения
Match string `msgpack:"match"` // Паттерн для regex
}
// TriggerOperation определяет операцию, выполняемую триггером
type TriggerOperation struct {
Type string `msgpack:"type"` // set, unset, inc, mul, rename, currentDate
Field string `msgpack:"field"` // Поле для операции
Value interface{} `msgpack:"value"` // Значение для операции
Params map[string]interface{} `msgpack:"params"`
}
// TriggerExecution содержит контекст выполнения триггера
type TriggerExecution struct {
TriggerName string
Event TriggerEvent
Collection string
Database string
DocumentID string
OldDocument *Document
NewDocument *Document
Operation string
Timestamp time.Time
User string
Role string
CustomData map[string]interface{}
}
// TriggerManager управляет триггерами в СУБД
type TriggerManager struct {
triggers sync.Map // map[string]*Trigger (ключ: collection|event|name)
logger *log.Logger
mu sync.RWMutex
auditLog []*TriggerExecution
maxLogSize int
}
var (
globalTriggerManager *TriggerManager
triggerManagerOnce sync.Once
)
// GetTriggerManager возвращает глобальный менеджер триггеров
func GetTriggerManager() *TriggerManager {
triggerManagerOnce.Do(func() {
globalTriggerManager = &TriggerManager{
maxLogSize: 10000,
auditLog: make([]*TriggerExecution, 0),
}
})
return globalTriggerManager
}
// InitTriggerManager инициализирует менеджер триггеров с логгером
func InitTriggerManager(logger *log.Logger) {
tm := GetTriggerManager()
tm.logger = logger
if logger != nil {
logger.Info("Trigger manager initialized")
}
}
// CreateTrigger создаёт новый триггер (синтаксис MongoDB-like)
// Пример: db.collection.createTrigger("triggerName", "BEFORE_INSERT", {
// condition: { field: "status", operator: "eq", value: "active" },
// action: "modify",
// operations: [
// { type: "set", field: "updated_at", value: "$$NOW" }
// ]
// })
func (tm *TriggerManager) CreateTrigger(database, collection, name string, event TriggerEvent, config map[string]interface{}) error {
tm.mu.Lock()
defer tm.mu.Unlock()
key := tm.getTriggerKey(collection, event, name)
if _, exists := tm.triggers.Load(key); exists {
return fmt.Errorf("trigger '%s' already exists on %s for event %s", name, collection, event)
}
trigger := &Trigger{
Name: name,
Collection: collection,
Event: event,
Enabled: true,
CreatedAt: time.Now().UnixMilli(),
UpdatedAt: time.Now().UnixMilli(),
Operations: make([]TriggerOperation, 0),
}
// Парсим конфигурацию триггера
if action, ok := config["action"].(string); ok {
switch strings.ToLower(action) {
case "abort":
trigger.Action = ActionAbort
case "skip":
trigger.Action = ActionSkip
case "modify":
trigger.Action = ActionModify
case "log":
trigger.Action = ActionLog
case "notify":
trigger.Action = ActionNotify
default:
trigger.Action = ActionCustom
}
}
// Парсим условие
if cond, ok := config["condition"].(map[string]interface{}); ok {
trigger.Condition = &TriggerCondition{}
if field, ok := cond["field"].(string); ok {
trigger.Condition.Field = field
}
if operator, ok := cond["operator"].(string); ok {
trigger.Condition.Operator = operator
}
if value, ok := cond["value"]; ok {
trigger.Condition.Value = value
}
if match, ok := cond["match"].(string); ok {
trigger.Condition.Match = match
}
}
// Парсим операции
if ops, ok := config["operations"].([]interface{}); ok {
for _, opRaw := range ops {
opMap, ok := opRaw.(map[string]interface{})
if !ok {
continue
}
operation := TriggerOperation{}
if opType, ok := opMap["type"].(string); ok {
operation.Type = opType
}
if field, ok := opMap["field"].(string); ok {
operation.Field = field
}
if value, ok := opMap["value"]; ok {
operation.Value = value
}
if params, ok := opMap["params"].(map[string]interface{}); ok {
operation.Params = params
}
trigger.Operations = append(trigger.Operations, operation)
}
}
if desc, ok := config["description"].(string); ok {
trigger.Description = desc
}
tm.triggers.Store(key, trigger)
if tm.logger != nil {
tm.logger.Info(fmt.Sprintf("Trigger '%s' created on %s.%s for event %s", name, database, collection, event))
}
return nil
}
// DropTrigger удаляет триггер
func (tm *TriggerManager) DropTrigger(collection, event, name string) error {
key := tm.getTriggerKey(collection, TriggerEvent(event), name)
if _, exists := tm.triggers.LoadAndDelete(key); !exists {
return fmt.Errorf("trigger '%s' not found on %s for event %s", name, collection, event)
}
if tm.logger != nil {
tm.logger.Info(fmt.Sprintf("Trigger '%s' dropped from %s for event %s", name, collection, event))
}
return nil
}
// GetTrigger возвращает триггер по имени
func (tm *TriggerManager) GetTrigger(collection, event, name string) (*Trigger, error) {
key := tm.getTriggerKey(collection, TriggerEvent(event), name)
if val, ok := tm.triggers.Load(key); ok {
return val.(*Trigger), nil
}
return nil, fmt.Errorf("trigger not found: %s", name)
}
// ListTriggers возвращает список всех триггеров для коллекции
func (tm *TriggerManager) ListTriggers(collection string) []*Trigger {
triggers := make([]*Trigger, 0)
tm.triggers.Range(func(key, value interface{}) bool {
trigger := value.(*Trigger)
if collection == "" || trigger.Collection == collection {
triggers = append(triggers, trigger)
}
return true
})
return triggers
}
// ListTriggersByEvent возвращает триггеры для конкретного события
func (tm *TriggerManager) ListTriggersByEvent(collection string, event TriggerEvent) []*Trigger {
triggers := make([]*Trigger, 0)
tm.triggers.Range(func(key, value interface{}) bool {
trigger := value.(*Trigger)
if trigger.Collection == collection && trigger.Event == event && trigger.Enabled {
triggers = append(triggers, trigger)
}
return true
})
return triggers
}
// EnableTrigger включает триггер
func (tm *TriggerManager) EnableTrigger(collection, event, name string) error {
key := tm.getTriggerKey(collection, TriggerEvent(event), name)
val, ok := tm.triggers.Load(key)
if !ok {
return fmt.Errorf("trigger not found: %s", name)
}
trigger := val.(*Trigger)
trigger.mu.Lock()
trigger.Enabled = true
trigger.UpdatedAt = time.Now().UnixMilli()
trigger.mu.Unlock()
tm.triggers.Store(key, trigger)
return nil
}
// DisableTrigger выключает триггер
func (tm *TriggerManager) DisableTrigger(collection, event, name string) error {
key := tm.getTriggerKey(collection, TriggerEvent(event), name)
val, ok := tm.triggers.Load(key)
if !ok {
return fmt.Errorf("trigger not found: %s", name)
}
trigger := val.(*Trigger)
trigger.mu.Lock()
trigger.Enabled = false
trigger.UpdatedAt = time.Now().UnixMilli()
trigger.mu.Unlock()
tm.triggers.Store(key, trigger)
return nil
}
// ExecuteTriggers выполняет все триггеры для данного события
// Возвращает: modifiedDocument, shouldAbort, error
func (tm *TriggerManager) ExecuteTriggers(execCtx *TriggerExecution) (*Document, bool, error) {
triggers := tm.ListTriggersByEvent(execCtx.Collection, execCtx.Event)
if len(triggers) == 0 {
return execCtx.NewDocument, false, nil
}
currentDoc := execCtx.NewDocument
if currentDoc == nil && execCtx.OldDocument != nil {
currentDoc = execCtx.OldDocument.Clone()
}
for _, trigger := range triggers {
if !trigger.Enabled {
continue
}
// Проверяем условие
if trigger.Condition != nil {
if !tm.evaluateCondition(execCtx, trigger.Condition) {
continue
}
}
// Выполняем действие триггера
switch trigger.Action {
case ActionAbort:
tm.logExecution(execCtx, trigger, "aborted")
return currentDoc, true, fmt.Errorf("operation aborted by trigger: %s", trigger.Name)
case ActionSkip:
tm.logExecution(execCtx, trigger, "skipped")
return currentDoc, true, nil
case ActionModify:
if currentDoc != nil {
currentDoc = tm.applyOperations(currentDoc, trigger.Operations, execCtx)
}
tm.logExecution(execCtx, trigger, "modified")
case ActionLog:
tm.logExecution(execCtx, trigger, "logged")
if tm.logger != nil {
tm.logger.Info(fmt.Sprintf("Trigger %s executed on %s.%s (event: %s, doc: %s)",
trigger.Name, execCtx.Database, execCtx.Collection, execCtx.Event, execCtx.DocumentID))
}
case ActionNotify:
tm.logExecution(execCtx, trigger, "notified")
// Здесь можно отправить уведомление через WebSocket или другой канал
}
}
return currentDoc, false, nil
}
// evaluateCondition проверяет условие триггера
func (tm *TriggerManager) evaluateCondition(execCtx *TriggerExecution, cond *TriggerCondition) bool {
var docToCheck *Document
if execCtx.NewDocument != nil {
docToCheck = execCtx.NewDocument
} else if execCtx.OldDocument != nil {
docToCheck = execCtx.OldDocument
} else {
return false
}
fieldValue, err := docToCheck.GetField(cond.Field)
if err != nil {
// Поле не существует
if cond.Operator == "exists" {
if existsVal, ok := cond.Value.(bool); ok && !existsVal {
return true
}
}
return false
}
switch cond.Operator {
case "eq":
return fmt.Sprintf("%v", fieldValue) == fmt.Sprintf("%v", cond.Value)
case "ne":
return fmt.Sprintf("%v", fieldValue) != fmt.Sprintf("%v", cond.Value)
case "gt":
return compareNumbers(fieldValue, cond.Value) > 0
case "lt":
return compareNumbers(fieldValue, cond.Value) < 0
case "gte":
return compareNumbers(fieldValue, cond.Value) >= 0
case "lte":
return compareNumbers(fieldValue, cond.Value) <= 0
case "in":
if arr, ok := cond.Value.([]interface{}); ok {
for _, v := range arr {
if fmt.Sprintf("%v", fieldValue) == fmt.Sprintf("%v", v) {
return true
}
}
}
return false
case "nin":
if arr, ok := cond.Value.([]interface{}); ok {
for _, v := range arr {
if fmt.Sprintf("%v", fieldValue) == fmt.Sprintf("%v", v) {
return false
}
}
}
return true
case "exists":
if existsVal, ok := cond.Value.(bool); ok {
return existsVal
}
return true
case "regex":
if pattern, ok := cond.Value.(string); ok {
matched, _ := regexp.MatchString(pattern, fmt.Sprintf("%v", fieldValue))
return matched
}
return false
default:
return true
}
}
// applyOperations применяет операции к документу
func (tm *TriggerManager) applyOperations(doc *Document, ops []TriggerOperation, execCtx *TriggerExecution) *Document {
if doc == nil {
return nil
}
result := doc.Clone()
for _, op := range ops {
switch op.Type {
case "set":
value := tm.resolveValue(op.Value, execCtx)
result.SetField(op.Field, value)
case "unset":
result.DeleteField(op.Field)
case "inc":
if incVal, ok := toFloat64(op.Value); ok {
if current, err := result.GetField(op.Field); err == nil {
if currVal, ok := toFloat64(current); ok {
result.SetField(op.Field, currVal+incVal)
}
} else {
result.SetField(op.Field, incVal)
}
}
case "mul":
if mulVal, ok := toFloat64(op.Value); ok {
if current, err := result.GetField(op.Field); err == nil {
if currVal, ok := toFloat64(current); ok {
result.SetField(op.Field, currVal*mulVal)
}
}
}
case "rename":
if newName, ok := op.Value.(string); ok {
if val, err := result.GetField(op.Field); err == nil {
result.SetField(newName, val)
result.DeleteField(op.Field)
}
}
case "currentDate":
result.SetField(op.Field, time.Now().UnixMilli())
}
}
return result
}
// resolveValue разрешает специальные значения типа $$NOW, $$USER
func (tm *TriggerManager) resolveValue(value interface{}, execCtx *TriggerExecution) interface{} {
if strVal, ok := value.(string); ok {
switch strVal {
case "$$NOW":
return time.Now().UnixMilli()
case "$$USER":
if execCtx.User != "" {
return execCtx.User
}
return "anonymous"
case "$$ROLE":
if execCtx.Role != "" {
return execCtx.Role
}
return "anonymous"
}
}
return value
}
// logExecution логирует выполнение триггера
func (tm *TriggerManager) logExecution(execCtx *TriggerExecution, trigger *Trigger, result string) {
tm.mu.Lock()
defer tm.mu.Unlock()
execCtx.TriggerName = trigger.Name
execCtx.Timestamp = time.Now()
if len(tm.auditLog) >= tm.maxLogSize {
tm.auditLog = tm.auditLog[1:]
}
tm.auditLog = append(tm.auditLog, execCtx)
}
// GetTriggerExecutionLog возвращает лог выполнения триггеров
func (tm *TriggerManager) GetTriggerExecutionLog() []*TriggerExecution {
tm.mu.RLock()
defer tm.mu.RUnlock()
result := make([]*TriggerExecution, len(tm.auditLog))
copy(result, tm.auditLog)
return result
}
// getTriggerKey возвращает ключ для хранения триггера
func (tm *TriggerManager) getTriggerKey(collection string, event TriggerEvent, name string) string {
return fmt.Sprintf("%s|%s|%s", collection, event, name)
}
// compareNumbers сравнивает два числа
func compareNumbers(a, b interface{}) int {
aVal, aOk := toFloat64(a)
bVal, bOk := toFloat64(b)
if aOk && bOk {
if aVal < bVal {
return -1
}
if aVal > bVal {
return 1
}
return 0
}
return 0
}
// MongoDBLikeTriggerConfig создаёт конфигурацию триггера в стиле MongoDB
// Пример использования:
// config := MongoDBLikeTriggerConfig().
// On("BEFORE_INSERT").
// Condition("status", "eq", "active").
// Set("updated_at", "$$NOW").
// Build()
func MongoDBLikeTriggerConfig() *TriggerConfigBuilder {
return &TriggerConfigBuilder{
config: make(map[string]interface{}),
ops: make([]interface{}, 0),
}
}
// TriggerConfigBuilder строитель конфигурации триггера
type TriggerConfigBuilder struct {
config map[string]interface{}
ops []interface{}
}
// On устанавливает событие триггера
func (b *TriggerConfigBuilder) On(event string) *TriggerConfigBuilder {
b.config["event"] = event
return b
}
// Condition добавляет условие
func (b *TriggerConfigBuilder) Condition(field, operator string, value interface{}) *TriggerConfigBuilder {
b.config["condition"] = map[string]interface{}{
"field": field,
"operator": operator,
"value": value,
}
return b
}
// ConditionRegex добавляет regex условие
func (b *TriggerConfigBuilder) ConditionRegex(field, pattern string) *TriggerConfigBuilder {
b.config["condition"] = map[string]interface{}{
"field": field,
"operator": "regex",
"value": pattern,
}
return b
}
// Set добавляет операцию установки поля
func (b *TriggerConfigBuilder) Set(field string, value interface{}) *TriggerConfigBuilder {
b.ops = append(b.ops, map[string]interface{}{
"type": "set",
"field": field,
"value": value,
})
return b
}
// Unset добавляет операцию удаления поля
func (b *TriggerConfigBuilder) Unset(field string) *TriggerConfigBuilder {
b.ops = append(b.ops, map[string]interface{}{
"type": "unset",
"field": field,
})
return b
}
// Inc добавляет операцию инкремента
func (b *TriggerConfigBuilder) Inc(field string, value float64) *TriggerConfigBuilder {
b.ops = append(b.ops, map[string]interface{}{
"type": "inc",
"field": field,
"value": value,
})
return b
}
// Mul добавляет операцию умножения
func (b *TriggerConfigBuilder) Mul(field string, value float64) *TriggerConfigBuilder {
b.ops = append(b.ops, map[string]interface{}{
"type": "mul",
"field": field,
"value": value,
})
return b
}
// Rename добавляет операцию переименования поля
func (b *TriggerConfigBuilder) Rename(oldName, newName string) *TriggerConfigBuilder {
b.ops = append(b.ops, map[string]interface{}{
"type": "rename",
"field": oldName,
"value": newName,
})
return b
}
// CurrentDate добавляет операцию установки текущей даты
func (b *TriggerConfigBuilder) CurrentDate(field string) *TriggerConfigBuilder {
b.ops = append(b.ops, map[string]interface{}{
"type": "currentDate",
"field": field,
})
return b
}
// Action устанавливает действие триггера
func (b *TriggerConfigBuilder) Action(action string) *TriggerConfigBuilder {
b.config["action"] = action
return b
}
// Description устанавливает описание триггера
func (b *TriggerConfigBuilder) Description(desc string) *TriggerConfigBuilder {
b.config["description"] = desc
return b
}
// Build собирает конфигурацию
func (b *TriggerConfigBuilder) Build() map[string]interface{} {
b.config["operations"] = b.ops
return b.config
}