From 062718d614b2834a5849e128651df183b6ff2c22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Wed, 29 Apr 2026 21:45:43 +0000 Subject: [PATCH] Delete internal/storage/transaction.go --- internal/storage/transaction.go | 392 -------------------------------- 1 file changed, 392 deletions(-) delete mode 100644 internal/storage/transaction.go diff --git a/internal/storage/transaction.go b/internal/storage/transaction.go deleted file mode 100644 index c4ecbad..0000000 --- a/internal/storage/transaction.go +++ /dev/null @@ -1,392 +0,0 @@ -/* - * Copyright 2026 Safronov Grigorii - * - * Licensed under the CDDL, Version 1.0 (the "License"); - * you may not use this file except in compliance with the License. - * - * You may obtain a copy of the License at - * https://opensource.org/licenses/CDDL-1.0 - */ - -// Файл: internal/storage/transaction.go -// Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control) -// и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование. - -package storage - -import ( - "encoding/binary" - "fmt" - "os" - "sync" - "sync/atomic" - "time" - - "futriis/internal/serializer" -) - -// TransactionID представляет уникальный идентификатор транзакции -type TransactionID uint64 - -// TransactionState представляет состояние транзакции -type TransactionState int32 - -const ( - TransactionActive TransactionState = iota - TransactionCommitted - TransactionAborted -) - -// TransactionRecord представляет запись в WAL -type TransactionRecord struct { - ID TransactionID `msgpack:"id"` - State TransactionState `msgpack:"state"` - Timestamp int64 `msgpack:"timestamp"` - Operations []Operation `msgpack:"operations"` -} - -// Operation представляет одну операцию в транзакции -type Operation struct { - Type string `msgpack:"type"` // "insert", "update", "delete" - Database string `msgpack:"database"` - Collection string `msgpack:"collection"` - DocumentID string `msgpack:"document_id"` - Data map[string]interface{} `msgpack:"data"` - Version uint64 `msgpack:"version"` -} - -// DocumentVersion представляет версию документа для MVCC -type DocumentVersion struct { - Document *Document `msgpack:"document"` - Timestamp int64 `msgpack:"timestamp"` - TxID TransactionID `msgpack:"tx_id"` -} - -// TransactionManager управляет транзакциями -type TransactionManager struct { - activeTransactions sync.Map // map[TransactionID]*Transaction - nextTxID atomic.Uint64 - wal *WriteAheadLog - mu sync.RWMutex -} - -// Transaction представляет одну транзакцию -type Transaction struct { - ID TransactionID - State atomic.Int32 - Operations []Operation - StartTime int64 - mu sync.RWMutex -} - -// WriteAheadLog реализует журнал предзаписи -type WriteAheadLog struct { - file *os.File - writeChan chan []byte - done chan struct{} - mu sync.RWMutex -} - -var ( - globalTxManager *TransactionManager - txManagerOnce sync.Once - currentTx atomic.Value // *Transaction -) - -// InitTransactionManager инициализирует глобальный менеджер транзакций -func InitTransactionManager(walPath string) error { - var err error - txManagerOnce.Do(func() { - globalTxManager = &TransactionManager{ - nextTxID: atomic.Uint64{}, - } - globalTxManager.nextTxID.Store(1) - err = globalTxManager.initWAL(walPath) - }) - return err -} - -// initWAL инициализирует Write-Ahead Log -func (tm *TransactionManager) initWAL(walPath string) error { - wal, err := NewWriteAheadLog(walPath) - if err != nil { - return err - } - tm.wal = wal - - // Восстанавливаем состояние из WAL при запуске - go tm.recoverFromWAL() - - return nil -} - -// NewWriteAheadLog создаёт новый WAL -func NewWriteAheadLog(path string) (*WriteAheadLog, error) { - file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) - if err != nil { - return nil, err - } - - wal := &WriteAheadLog{ - file: file, - writeChan: make(chan []byte, 10000), - done: make(chan struct{}), - } - - go wal.writerLoop() - - return wal, nil -} - -// writerLoop асинхронно записывает данные в WAL -func (wal *WriteAheadLog) writerLoop() { - for data := range wal.writeChan { - wal.mu.Lock() - // Формат записи: [длина (4 байта)][данные] - lenBuf := make([]byte, 4) - binary.BigEndian.PutUint32(lenBuf, uint32(len(data))) - - if _, err := wal.file.Write(lenBuf); err != nil { - continue - } - if _, err := wal.file.Write(data); err != nil { - continue - } - wal.file.Sync() - wal.mu.Unlock() - } - close(wal.done) -} - -// Write записывает запись в WAL -func (wal *WriteAheadLog) Write(record *TransactionRecord) error { - data, err := serializer.Marshal(record) - if err != nil { - return err - } - - select { - case wal.writeChan <- data: - return nil - default: - return fmt.Errorf("WAL buffer full") - } -} - -// Close закрывает WAL -func (wal *WriteAheadLog) Close() error { - close(wal.writeChan) - <-wal.done - return wal.file.Close() -} - -// BeginTransaction начинает новую транзакцию -func BeginTransaction() *Transaction { - if globalTxManager == nil { - InitTransactionManager("futriis.wal") - } - - tx := &Transaction{ - ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1), - StartTime: time.Now().UnixMilli(), - Operations: make([]Operation, 0), - } - tx.State.Store(int32(TransactionActive)) - - globalTxManager.activeTransactions.Store(tx.ID, tx) - - // Сохраняем как текущую транзакцию для горутины - currentTx.Store(tx) - - // Записываем начало транзакции в WAL - record := &TransactionRecord{ - ID: tx.ID, - State: TransactionActive, - Timestamp: tx.StartTime, - Operations: tx.Operations, - } - globalTxManager.wal.Write(record) - - return tx -} - -// AddToTransaction добавляет операцию в текущую транзакцию -func AddToTransaction(coll *Collection, opType string, doc *Document) error { - txVal := currentTx.Load() - if txVal == nil { - return fmt.Errorf("no active transaction") - } - - tx := txVal.(*Transaction) - if TransactionState(tx.State.Load()) != TransactionActive { - return fmt.Errorf("transaction is not active") - } - - op := Operation{ - Type: opType, - Database: coll.Name(), // В реальной реализации нужно передавать имя БД - Collection: coll.Name(), - DocumentID: doc.ID, - Data: doc.GetFields(), - Version: doc.Version, - } - - tx.mu.Lock() - tx.Operations = append(tx.Operations, op) - tx.mu.Unlock() - - return nil -} - -// CommitCurrentTransaction коммитит текущую транзакцию -func CommitCurrentTransaction() error { - txVal := currentTx.Load() - if txVal == nil { - return fmt.Errorf("no active transaction") - } - - tx := txVal.(*Transaction) - if TransactionState(tx.State.Load()) != TransactionActive { - return fmt.Errorf("transaction is not active") - } - - // Применяем все операции атомарно - for _, op := range tx.Operations { - if err := applyOperation(op); err != nil { - // Откатываем при ошибке - AbortCurrentTransaction() - return fmt.Errorf("transaction commit failed: %v", err) - } - } - - tx.State.Store(int32(TransactionCommitted)) - - // Записываем коммит в WAL - record := &TransactionRecord{ - ID: tx.ID, - State: TransactionCommitted, - Timestamp: time.Now().UnixMilli(), - Operations: tx.Operations, - } - globalTxManager.wal.Write(record) - - // Очищаем текущую транзакцию - currentTx.Store(nil) - globalTxManager.activeTransactions.Delete(tx.ID) - - return nil -} - -// AbortCurrentTransaction откатывает текущую транзакцию -func AbortCurrentTransaction() error { - txVal := currentTx.Load() - if txVal == nil { - return fmt.Errorf("no active transaction") - } - - tx := txVal.(*Transaction) - tx.State.Store(int32(TransactionAborted)) - - // Записываем откат в WAL - record := &TransactionRecord{ - ID: tx.ID, - State: TransactionAborted, - Timestamp: time.Now().UnixMilli(), - Operations: tx.Operations, - } - globalTxManager.wal.Write(record) - - // Очищаем текущую транзакцию - currentTx.Store(nil) - globalTxManager.activeTransactions.Delete(tx.ID) - - return nil -} - -// HasActiveTransaction проверяет наличие активной транзакции -func HasActiveTransaction() bool { - return currentTx.Load() != nil -} - -// FindInTransaction ищет документ в контексте транзакции -func FindInTransaction(coll *Collection, id string) (*Document, error) { - txVal := currentTx.Load() - if txVal == nil { - return coll.Find(id) - } - - tx := txVal.(*Transaction) - - // Сначала ищем в операциях транзакции - for i := len(tx.Operations) - 1; i >= 0; i-- { - op := tx.Operations[i] - if op.DocumentID == id { - if op.Type == "delete" { - return nil, fmt.Errorf("key not found") - } - // Создаём документ из данных операции - doc := NewDocumentWithID(op.DocumentID) - for k, v := range op.Data { - doc.SetField(k, v) - } - doc.Version = op.Version - return doc, nil - } - } - - // Ищем в основном хранилище - return coll.Find(id) -} - -// applyOperation применяет операцию к хранилищу -func applyOperation(op Operation) error { - // В реальной реализации здесь будет применение операции к соответствующей коллекции - // С использованием MVCC для версионирования - - switch op.Type { - case "insert": - // Проверяем версию документа (MVCC) - doc := NewDocumentWithID(op.DocumentID) - for k, v := range op.Data { - doc.SetField(k, v) - } - // Здесь должна быть вставка в коллекцию - return nil - case "update": - // Обновление с проверкой версии - return nil - case "delete": - // Удаление - return nil - } - - return nil -} - -// recoverFromWAL восстанавливает состояние из WAL после сбоя -func (tm *TransactionManager) recoverFromWAL() { - // В реальной реализации здесь будет чтение WAL и восстановление - // незавершённых транзакций -} - -// GetTransaction возвращает транзакцию по ID -func GetTransaction(id TransactionID) (*Transaction, bool) { - if val, ok := globalTxManager.activeTransactions.Load(id); ok { - return val.(*Transaction), true - } - return nil, false -} - -// MVCCSnapshot создаёт снапшот текущего состояния для MVCC -func MVCCSnapshot() uint64 { - return uint64(time.Now().UnixNano()) -} - -// CreateDocumentVersion создаёт новую версию документа для MVCC -func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion { - return &DocumentVersion{ - Document: doc.Clone(), - Timestamp: time.Now().UnixMilli(), - TxID: txID, - } -}