/* * Copyright 2026 Safronov Grigorii * * Licensed under the CDDL, Version 1.0 (the "License"); * you may not use this file except in compliance with the License. * * You may obtain a copy of the License at * https://opensource.org/licenses/CDDL-1.0 */ // Файл: internal/storage/transaction.go // Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control) // и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование. package storage import ( "encoding/binary" "fmt" "os" "sync" "sync/atomic" "time" "futriis/internal/serializer" ) // TransactionID представляет уникальный идентификатор транзакции type TransactionID uint64 // TransactionState представляет состояние транзакции type TransactionState int32 const ( TransactionActive TransactionState = iota TransactionCommitted TransactionAborted ) // TransactionRecord представляет запись в WAL type TransactionRecord struct { ID TransactionID `msgpack:"id"` State TransactionState `msgpack:"state"` Timestamp int64 `msgpack:"timestamp"` Operations []Operation `msgpack:"operations"` } // Operation представляет одну операцию в транзакции type Operation struct { Type string `msgpack:"type"` // "insert", "update", "delete" Database string `msgpack:"database"` Collection string `msgpack:"collection"` DocumentID string `msgpack:"document_id"` Data map[string]interface{} `msgpack:"data"` Version uint64 `msgpack:"version"` OldData map[string]interface{} `msgpack:"old_data"` // для отката } // DocumentVersion представляет версию документа для MVCC type DocumentVersion struct { Document *Document `msgpack:"document"` Timestamp int64 `msgpack:"timestamp"` TxID TransactionID `msgpack:"tx_id"` } // TransactionInfo представляет информацию о транзакции для API type TransactionInfo struct { ID string `json:"id"` Status string `json:"status"` StartTime int64 `json:"start_time"` OperationCount int `json:"operation_count"` Operations []OperationInfo `json:"operations,omitempty"` } // OperationInfo представляет информацию об операции для API type OperationInfo struct { Type string `json:"type"` Database string `json:"database"` Collection string `json:"collection"` DocumentID string `json:"document_id"` } // LoggerInterface определяет интерфейс для логирования type LoggerInterface interface { Debug(msg string) Info(msg string) Error(msg string) Warn(msg string) } // TransactionManager управляет транзакциями type TransactionManager struct { activeTransactions sync.Map // map[TransactionID]*Transaction nextTxID atomic.Uint64 wal *WriteAheadLog logger LoggerInterface mu sync.RWMutex walPath string snapshotInterval int64 // интервал создания снапшотов в секундах lastSnapshot int64 } // Transaction представляет одну транзакцию type Transaction struct { ID TransactionID State atomic.Int32 Operations []Operation StartTime int64 mu sync.RWMutex } // WriteAheadLog реализует журнал предзаписи type WriteAheadLog struct { file *os.File writeChan chan []byte done chan struct{} mu sync.RWMutex syncInterval time.Duration lastSync time.Time } var ( globalTxManager *TransactionManager txManagerOnce sync.Once currentTx atomic.Value // *Transaction ) // InitTransactionManager инициализирует глобальный менеджер транзакций func InitTransactionManager(walPath string) error { var err error txManagerOnce.Do(func() { globalTxManager = &TransactionManager{ nextTxID: atomic.Uint64{}, walPath: walPath, snapshotInterval: 300, // 5 минут lastSnapshot: time.Now().Unix(), } globalTxManager.nextTxID.Store(1) err = globalTxManager.initWAL(walPath) if err == nil { // Восстанавливаем состояние из WAL go globalTxManager.recoverFromWAL() // Запускаем периодическое создание снапшотов go globalTxManager.snapshotLoop() } }) return err } // SetTransactionLogger устанавливает логгер для транзакций func SetTransactionLogger(logger LoggerInterface) { if globalTxManager != nil { globalTxManager.logger = logger } } // initWAL инициализирует Write-Ahead Log func (tm *TransactionManager) initWAL(walPath string) error { wal, err := NewWriteAheadLog(walPath) if err != nil { return err } tm.wal = wal return nil } // NewWriteAheadLog создаёт новый WAL func NewWriteAheadLog(path string) (*WriteAheadLog, error) { file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644) if err != nil { return nil, err } wal := &WriteAheadLog{ file: file, writeChan: make(chan []byte, 10000), done: make(chan struct{}), syncInterval: 5 * time.Second, lastSync: time.Now(), } go wal.writerLoop() return wal, nil } // writerLoop асинхронно записывает данные в WAL func (wal *WriteAheadLog) writerLoop() { batch := make([][]byte, 0, 100) ticker := time.NewTicker(wal.syncInterval) defer ticker.Stop() for { select { case data, ok := <-wal.writeChan: if !ok { if len(batch) > 0 { wal.flushBatch(batch) } close(wal.done) return } batch = append(batch, data) if len(batch) >= 100 { wal.flushBatch(batch) batch = batch[:0] } case <-ticker.C: if len(batch) > 0 { wal.flushBatch(batch) batch = batch[:0] } wal.sync() } } } // flushBatch записывает пакет данных в файл func (wal *WriteAheadLog) flushBatch(batch [][]byte) { wal.mu.Lock() defer wal.mu.Unlock() for _, data := range batch { lenBuf := make([]byte, 4) binary.BigEndian.PutUint32(lenBuf, uint32(len(data))) if _, err := wal.file.Write(lenBuf); err != nil { continue } if _, err := wal.file.Write(data); err != nil { continue } } } // sync синхронизирует файл с диском func (wal *WriteAheadLog) sync() { wal.mu.Lock() defer wal.mu.Unlock() if time.Since(wal.lastSync) >= wal.syncInterval { wal.file.Sync() wal.lastSync = time.Now() } } // Write записывает запись в WAL func (wal *WriteAheadLog) Write(record *TransactionRecord) error { data, err := serializer.Marshal(record) if err != nil { return err } select { case wal.writeChan <- data: return nil case <-time.After(5 * time.Second): return fmt.Errorf("WAL write timeout") } } // Close закрывает WAL func (wal *WriteAheadLog) Close() error { close(wal.writeChan) <-wal.done return wal.file.Close() } // ReadAll читает все записи из WAL func (wal *WriteAheadLog) ReadAll() ([]*TransactionRecord, error) { wal.mu.RLock() defer wal.mu.RUnlock() if _, err := wal.file.Seek(0, 0); err != nil { return nil, err } records := make([]*TransactionRecord, 0) buf := make([]byte, 4) for { _, err := wal.file.Read(buf) if err != nil { break } recordLen := binary.BigEndian.Uint32(buf) recordData := make([]byte, recordLen) _, err = wal.file.Read(recordData) if err != nil { break } var record TransactionRecord if err := serializer.Unmarshal(recordData, &record); err != nil { continue } records = append(records, &record) } return records, nil } // BeginTransaction начинает новую транзакцию func BeginTransaction() *Transaction { if globalTxManager == nil { InitTransactionManager("futriis.wal") } tx := &Transaction{ ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1), StartTime: time.Now().UnixMilli(), Operations: make([]Operation, 0), } tx.State.Store(int32(TransactionActive)) globalTxManager.activeTransactions.Store(tx.ID, tx) currentTx.Store(tx) // Записываем начало транзакции в WAL record := &TransactionRecord{ ID: tx.ID, State: TransactionActive, Timestamp: tx.StartTime, Operations: tx.Operations, } if globalTxManager.wal != nil { globalTxManager.wal.Write(record) } if globalTxManager.logger != nil { globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started", tx.ID)) } AuditLog("START", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{ "start_time": tx.StartTime, }) return tx } // BeginTransactionOnCollection начинает транзакцию для конкретной коллекции func BeginTransactionOnCollection(coll *Collection) error { if globalTxManager == nil { if err := InitTransactionManager("futriis.wal"); err != nil { return err } } tx := BeginTransaction() if tx == nil { return fmt.Errorf("failed to create transaction") } if globalTxManager.logger != nil { globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started on collection %s.%s", tx.ID, coll.dbName, coll.name)) } return nil } // AddToTransaction добавляет операцию в текущую транзакцию func AddToTransaction(coll *Collection, opType string, docID string, newData map[string]interface{}, oldData map[string]interface{}, version uint64) error { txVal := currentTx.Load() if txVal == nil { return fmt.Errorf("no active transaction") } tx := txVal.(*Transaction) if TransactionState(tx.State.Load()) != TransactionActive { return fmt.Errorf("transaction is not active") } op := Operation{ Type: opType, Database: coll.dbName, Collection: coll.name, DocumentID: docID, Data: newData, OldData: oldData, Version: version, } tx.mu.Lock() tx.Operations = append(tx.Operations, op) tx.mu.Unlock() if globalTxManager.logger != nil { globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d: added %s operation on %s", tx.ID, opType, docID)) } return nil } // CommitCurrentTransaction коммитит текущую транзакцию func CommitCurrentTransaction() error { txVal := currentTx.Load() if txVal == nil { return fmt.Errorf("no active transaction") } tx := txVal.(*Transaction) if TransactionState(tx.State.Load()) != TransactionActive { return fmt.Errorf("transaction is not active") } // Применяем все операции атомарно for _, op := range tx.Operations { if err := applyOperation(op); err != nil { // Откатываем при ошибке AbortCurrentTransaction() return fmt.Errorf("transaction commit failed at operation %s: %v", op.Type, err) } } tx.State.Store(int32(TransactionCommitted)) // Записываем коммит в WAL record := &TransactionRecord{ ID: tx.ID, State: TransactionCommitted, Timestamp: time.Now().UnixMilli(), Operations: tx.Operations, } if globalTxManager.wal != nil { globalTxManager.wal.Write(record) } if globalTxManager.logger != nil { globalTxManager.logger.Info(fmt.Sprintf("Transaction %d committed with %d operations", tx.ID, len(tx.Operations))) } AuditLog("COMMIT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{ "operations": len(tx.Operations), }) // Очищаем текущую транзакцию currentTx.Store(nil) globalTxManager.activeTransactions.Delete(tx.ID) return nil } // AbortCurrentTransaction откатывает текущую транзакцию func AbortCurrentTransaction() error { txVal := currentTx.Load() if txVal == nil { return fmt.Errorf("no active transaction") } tx := txVal.(*Transaction) tx.State.Store(int32(TransactionAborted)) // Записываем откат в WAL record := &TransactionRecord{ ID: tx.ID, State: TransactionAborted, Timestamp: time.Now().UnixMilli(), Operations: tx.Operations, } if globalTxManager.wal != nil { globalTxManager.wal.Write(record) } if globalTxManager.logger != nil { globalTxManager.logger.Info(fmt.Sprintf("Transaction %d aborted with %d operations", tx.ID, len(tx.Operations))) } AuditLog("ABORT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{ "operations": len(tx.Operations), }) // Очищаем текущую транзакцию currentTx.Store(nil) globalTxManager.activeTransactions.Delete(tx.ID) return nil } // HasActiveTransaction проверяет наличие активной транзакции func HasActiveTransaction() bool { return currentTx.Load() != nil } // GetCurrentTransactionID возвращает ID текущей транзакции func GetCurrentTransactionID() string { txVal := currentTx.Load() if txVal == nil { return "" } tx := txVal.(*Transaction) return fmt.Sprintf("%d", tx.ID) } // GetActiveTransactions возвращает список активных транзакций для API func GetActiveTransactions() []TransactionInfo { if globalTxManager == nil { return []TransactionInfo{} } transactions := make([]TransactionInfo, 0) globalTxManager.activeTransactions.Range(func(key, value interface{}) bool { tx := value.(*Transaction) status := "active" if TransactionState(tx.State.Load()) == TransactionCommitted { status = "committed" } else if TransactionState(tx.State.Load()) == TransactionAborted { status = "aborted" } tx.mu.RLock() opCount := len(tx.Operations) operations := make([]OperationInfo, 0, opCount) for _, op := range tx.Operations { operations = append(operations, OperationInfo{ Type: op.Type, Database: op.Database, Collection: op.Collection, DocumentID: op.DocumentID, }) } tx.mu.RUnlock() transactions = append(transactions, TransactionInfo{ ID: fmt.Sprintf("%d", tx.ID), Status: status, StartTime: tx.StartTime, OperationCount: opCount, Operations: operations, }) return true }) return transactions } // GetTransactionByID возвращает транзакцию по ID func GetTransactionByID(id string) (*Transaction, error) { if globalTxManager == nil { return nil, fmt.Errorf("transaction manager not initialized") } var txID TransactionID fmt.Sscanf(id, "%d", &txID) if val, ok := globalTxManager.activeTransactions.Load(txID); ok { return val.(*Transaction), nil } return nil, fmt.Errorf("transaction not found") } // FindInTransaction ищет документ в контексте транзакции func FindInTransaction(coll *Collection, id string) (*Document, error) { txVal := currentTx.Load() if txVal == nil { return coll.Find(id) } tx := txVal.(*Transaction) tx.mu.RLock() defer tx.mu.RUnlock() // Ищем в операциях транзакции в обратном порядке for i := len(tx.Operations) - 1; i >= 0; i-- { op := tx.Operations[i] if op.DocumentID == id { if op.Type == "delete" { return nil, fmt.Errorf("document deleted in transaction") } if op.Type == "insert" || op.Type == "update" { doc := NewDocumentWithID(op.DocumentID) for k, v := range op.Data { doc.SetField(k, v) } doc.Version = op.Version return doc, nil } } } return coll.Find(id) } // applyOperation применяет операцию к хранилищу func applyOperation(op Operation) error { db, err := GetGlobalStorage().GetDatabase(op.Database) if err != nil { return fmt.Errorf("database not found: %s", op.Database) } coll, err := db.GetCollection(op.Collection) if err != nil { return fmt.Errorf("collection not found: %s", op.Collection) } switch op.Type { case "insert": doc := NewDocumentWithID(op.DocumentID) for k, v := range op.Data { doc.SetField(k, v) } doc.Version = op.Version return coll.Insert(doc) case "update": return coll.Update(op.DocumentID, op.Data) case "delete": return coll.Delete(op.DocumentID) } return nil } // recoverFromWAL восстанавливает состояние из WAL после сбоя func (tm *TransactionManager) recoverFromWAL() { if tm.wal == nil { return } records, err := tm.wal.ReadAll() if err != nil { if tm.logger != nil { tm.logger.Error(fmt.Sprintf("Failed to read WAL: %v", err)) } return } for _, record := range records { if record.State == TransactionCommitted { // Повторно применяем закоммиченные операции for _, op := range record.Operations { if err := applyOperation(op); err != nil { if tm.logger != nil { tm.logger.Error(fmt.Sprintf("Failed to replay operation: %v", err)) } } } } // Транзакции в состоянии Active или Aborted игнорируем } if tm.logger != nil { tm.logger.Info(fmt.Sprintf("Recovered %d transactions from WAL", len(records))) } } // snapshotLoop периодически создаёт снапшоты func (tm *TransactionManager) snapshotLoop() { ticker := time.NewTicker(time.Duration(tm.snapshotInterval) * time.Second) defer ticker.Stop() for range ticker.C { tm.createSnapshot() } } // createSnapshot создаёт снапшот текущего состояния func (tm *TransactionManager) createSnapshot() { now := time.Now().Unix() if now-tm.lastSnapshot < tm.snapshotInterval { return } snapshotPath := fmt.Sprintf("%s.snapshot.%d", tm.walPath, now) if err := GetGlobalStorage().Backup(snapshotPath); err != nil { if tm.logger != nil { tm.logger.Error(fmt.Sprintf("Failed to create snapshot: %v", err)) } return } tm.lastSnapshot = now if tm.logger != nil { tm.logger.Info(fmt.Sprintf("Snapshot created: %s", snapshotPath)) } // Удаляем старые снапшоты go tm.cleanOldSnapshots() } // cleanOldSnapshots удаляет старые снапшоты func (tm *TransactionManager) cleanOldSnapshots() { // Реализация удаления старых снапшотов } // GetGlobalStorage возвращает глобальное хранилище (должно быть установлено при инициализации) var globalStorage *Storage // SetGlobalStorage устанавливает глобальное хранилище func SetGlobalStorage(s *Storage) { globalStorage = s } // GetGlobalStorage возвращает глобальное хранилище func GetGlobalStorage() *Storage { return globalStorage } // AuditLog записывает событие в аудит func AuditLog(operation, dataType, name string, details map[string]interface{}) { LogAudit(operation, dataType, name, details) } // MVCCSnapshot создаёт снапшот текущего состояния для MVCC func MVCCSnapshot() uint64 { return uint64(time.Now().UnixNano()) } // CreateDocumentVersion создаёт новую версию документа для MVCC func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion { return &DocumentVersion{ Document: doc.Clone(), Timestamp: time.Now().UnixMilli(), TxID: txID, } }