// Файл: internal/storage/transaction.go // Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control) // и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование. package storage import ( "encoding/binary" "fmt" "os" "sync" "sync/atomic" "time" "futriis/internal/serializer" ) // TransactionID представляет уникальный идентификатор транзакции type TransactionID uint64 // TransactionState представляет состояние транзакции type TransactionState int32 const ( TransactionActive TransactionState = iota TransactionCommitted TransactionAborted ) // TransactionRecord представляет запись в WAL type TransactionRecord struct { ID TransactionID `msgpack:"id"` State TransactionState `msgpack:"state"` Timestamp int64 `msgpack:"timestamp"` Operations []Operation `msgpack:"operations"` } // Operation представляет одну операцию в транзакции type Operation struct { Type string `msgpack:"type"` // "insert", "update", "delete" Database string `msgpack:"database"` Collection string `msgpack:"collection"` DocumentID string `msgpack:"document_id"` Data map[string]interface{} `msgpack:"data"` Version uint64 `msgpack:"version"` } // DocumentVersion представляет версию документа для MVCC type DocumentVersion struct { Document *Document `msgpack:"document"` Timestamp int64 `msgpack:"timestamp"` TxID TransactionID `msgpack:"tx_id"` } // TransactionManager управляет транзакциями type TransactionManager struct { activeTransactions sync.Map // map[TransactionID]*Transaction nextTxID atomic.Uint64 wal *WriteAheadLog mu sync.RWMutex } // Transaction представляет одну транзакцию type Transaction struct { ID TransactionID State atomic.Int32 Operations []Operation StartTime int64 mu sync.RWMutex } // WriteAheadLog реализует журнал предзаписи type WriteAheadLog struct { file *os.File writeChan chan []byte done chan struct{} mu sync.RWMutex } var ( globalTxManager *TransactionManager txManagerOnce sync.Once currentTx atomic.Value // *Transaction ) // InitTransactionManager инициализирует глобальный менеджер транзакций func InitTransactionManager(walPath string) error { var err error txManagerOnce.Do(func() { globalTxManager = &TransactionManager{ nextTxID: atomic.Uint64{}, } globalTxManager.nextTxID.Store(1) err = globalTxManager.initWAL(walPath) }) return err } // initWAL инициализирует Write-Ahead Log func (tm *TransactionManager) initWAL(walPath string) error { wal, err := NewWriteAheadLog(walPath) if err != nil { return err } tm.wal = wal // Восстанавливаем состояние из WAL при запуске go tm.recoverFromWAL() return nil } // NewWriteAheadLog создаёт новый WAL func NewWriteAheadLog(path string) (*WriteAheadLog, error) { file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) if err != nil { return nil, err } wal := &WriteAheadLog{ file: file, writeChan: make(chan []byte, 10000), done: make(chan struct{}), } go wal.writerLoop() return wal, nil } // writerLoop асинхронно записывает данные в WAL func (wal *WriteAheadLog) writerLoop() { for data := range wal.writeChan { wal.mu.Lock() // Формат записи: [длина (4 байта)][данные] lenBuf := make([]byte, 4) binary.BigEndian.PutUint32(lenBuf, uint32(len(data))) if _, err := wal.file.Write(lenBuf); err != nil { continue } if _, err := wal.file.Write(data); err != nil { continue } wal.file.Sync() wal.mu.Unlock() } close(wal.done) } // Write записывает запись в WAL func (wal *WriteAheadLog) Write(record *TransactionRecord) error { data, err := serializer.Marshal(record) if err != nil { return err } select { case wal.writeChan <- data: return nil default: return fmt.Errorf("WAL buffer full") } } // Close закрывает WAL func (wal *WriteAheadLog) Close() error { close(wal.writeChan) <-wal.done return wal.file.Close() } // BeginTransaction начинает новую транзакцию func BeginTransaction() *Transaction { if globalTxManager == nil { InitTransactionManager("futriis.wal") } tx := &Transaction{ ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1), StartTime: time.Now().UnixMilli(), Operations: make([]Operation, 0), } tx.State.Store(int32(TransactionActive)) globalTxManager.activeTransactions.Store(tx.ID, tx) // Сохраняем как текущую транзакцию для горутины currentTx.Store(tx) // Записываем начало транзакции в WAL record := &TransactionRecord{ ID: tx.ID, State: TransactionActive, Timestamp: tx.StartTime, Operations: tx.Operations, } globalTxManager.wal.Write(record) return tx } // AddToTransaction добавляет операцию в текущую транзакцию func AddToTransaction(coll *Collection, opType string, doc *Document) error { txVal := currentTx.Load() if txVal == nil { return fmt.Errorf("no active transaction") } tx := txVal.(*Transaction) if TransactionState(tx.State.Load()) != TransactionActive { return fmt.Errorf("transaction is not active") } op := Operation{ Type: opType, Database: coll.Name(), // В реальной реализации нужно передавать имя БД Collection: coll.Name(), DocumentID: doc.ID, Data: doc.GetFields(), Version: doc.Version, } tx.mu.Lock() tx.Operations = append(tx.Operations, op) tx.mu.Unlock() return nil } // CommitCurrentTransaction коммитит текущую транзакцию func CommitCurrentTransaction() error { txVal := currentTx.Load() if txVal == nil { return fmt.Errorf("no active transaction") } tx := txVal.(*Transaction) if TransactionState(tx.State.Load()) != TransactionActive { return fmt.Errorf("transaction is not active") } // Применяем все операции атомарно for _, op := range tx.Operations { if err := applyOperation(op); err != nil { // Откатываем при ошибке AbortCurrentTransaction() return fmt.Errorf("transaction commit failed: %v", err) } } tx.State.Store(int32(TransactionCommitted)) // Записываем коммит в WAL record := &TransactionRecord{ ID: tx.ID, State: TransactionCommitted, Timestamp: time.Now().UnixMilli(), Operations: tx.Operations, } globalTxManager.wal.Write(record) // Очищаем текущую транзакцию currentTx.Store(nil) globalTxManager.activeTransactions.Delete(tx.ID) return nil } // AbortCurrentTransaction откатывает текущую транзакцию func AbortCurrentTransaction() error { txVal := currentTx.Load() if txVal == nil { return fmt.Errorf("no active transaction") } tx := txVal.(*Transaction) tx.State.Store(int32(TransactionAborted)) // Записываем откат в WAL record := &TransactionRecord{ ID: tx.ID, State: TransactionAborted, Timestamp: time.Now().UnixMilli(), Operations: tx.Operations, } globalTxManager.wal.Write(record) // Очищаем текущую транзакцию currentTx.Store(nil) globalTxManager.activeTransactions.Delete(tx.ID) return nil } // HasActiveTransaction проверяет наличие активной транзакции func HasActiveTransaction() bool { return currentTx.Load() != nil } // FindInTransaction ищет документ в контексте транзакции func FindInTransaction(coll *Collection, id string) (*Document, error) { txVal := currentTx.Load() if txVal == nil { return coll.Find(id) } tx := txVal.(*Transaction) // Сначала ищем в операциях транзакции for i := len(tx.Operations) - 1; i >= 0; i-- { op := tx.Operations[i] if op.DocumentID == id { if op.Type == "delete" { return nil, fmt.Errorf("key not found") } // Создаём документ из данных операции doc := NewDocumentWithID(op.DocumentID) for k, v := range op.Data { doc.SetField(k, v) } doc.Version = op.Version return doc, nil } } // Ищем в основном хранилище return coll.Find(id) } // applyOperation применяет операцию к хранилищу func applyOperation(op Operation) error { // В реальной реализации здесь будет применение операции к соответствующей коллекции // С использованием MVCC для версионирования switch op.Type { case "insert": // Проверяем версию документа (MVCC) doc := NewDocumentWithID(op.DocumentID) for k, v := range op.Data { doc.SetField(k, v) } // Здесь должна быть вставка в коллекцию return nil case "update": // Обновление с проверкой версии return nil case "delete": // Удаление return nil } return nil } // recoverFromWAL восстанавливает состояние из WAL после сбоя func (tm *TransactionManager) recoverFromWAL() { // В реальной реализации здесь будет чтение WAL и восстановление // незавершённых транзакций } // GetTransaction возвращает транзакцию по ID func GetTransaction(id TransactionID) (*Transaction, bool) { if val, ok := globalTxManager.activeTransactions.Load(id); ok { return val.(*Transaction), true } return nil, false } // MVCCSnapshot создаёт снапшот текущего состояния для MVCC func MVCCSnapshot() uint64 { return uint64(time.Now().UnixNano()) } // CreateDocumentVersion создаёт новую версию документа для MVCC func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion { return &DocumentVersion{ Document: doc.Clone(), Timestamp: time.Now().UnixMilli(), TxID: txID, } }