From 70cc725add29f639bdc4688e86f14471e180a548 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Sun, 17 May 2026 14:32:12 +0000 Subject: [PATCH] Delete internal/storage/transaction.go --- internal/storage/transaction.go | 722 -------------------------------- 1 file changed, 722 deletions(-) delete mode 100644 internal/storage/transaction.go diff --git a/internal/storage/transaction.go b/internal/storage/transaction.go deleted file mode 100644 index 154dc99..0000000 --- a/internal/storage/transaction.go +++ /dev/null @@ -1,722 +0,0 @@ -/* - * Copyright 2026 Safronov Grigorii - * - * Licensed under the CDDL, Version 1.0 (the "License"); - * you may not use this file except in compliance with the License. - * - * You may obtain a copy of the License at - * https://opensource.org/licenses/CDDL-1.0 - */ - -// Файл: internal/storage/transaction.go -// Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control) -// и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование. - -package storage - -import ( - "encoding/binary" - "fmt" - "os" - "sync" - "sync/atomic" - "time" - - "futriis/internal/serializer" -) - -// TransactionID представляет уникальный идентификатор транзакции -type TransactionID uint64 - -// TransactionState представляет состояние транзакции -type TransactionState int32 - -const ( - TransactionActive TransactionState = iota - TransactionCommitted - TransactionAborted -) - -// TransactionRecord представляет запись в WAL -type TransactionRecord struct { - ID TransactionID `msgpack:"id"` - State TransactionState `msgpack:"state"` - Timestamp int64 `msgpack:"timestamp"` - Operations []Operation `msgpack:"operations"` -} - -// Operation представляет одну операцию в транзакции -type Operation struct { - Type string `msgpack:"type"` // "insert", "update", "delete" - Database string `msgpack:"database"` - Collection string `msgpack:"collection"` - DocumentID string `msgpack:"document_id"` - Data map[string]interface{} `msgpack:"data"` - Version uint64 `msgpack:"version"` - OldData map[string]interface{} `msgpack:"old_data"` // для отката -} - -// DocumentVersion представляет версию документа для MVCC -type DocumentVersion struct { - Document *Document `msgpack:"document"` - Timestamp int64 `msgpack:"timestamp"` - TxID TransactionID `msgpack:"tx_id"` -} - -// TransactionInfo представляет информацию о транзакции для API -type TransactionInfo struct { - ID string `json:"id"` - Status string `json:"status"` - StartTime int64 `json:"start_time"` - OperationCount int `json:"operation_count"` - Operations []OperationInfo `json:"operations,omitempty"` -} - -// OperationInfo представляет информацию об операции для API -type OperationInfo struct { - Type string `json:"type"` - Database string `json:"database"` - Collection string `json:"collection"` - DocumentID string `json:"document_id"` -} - -// LoggerInterface определяет интерфейс для логирования -type LoggerInterface interface { - Debug(msg string) - Info(msg string) - Error(msg string) - Warn(msg string) -} - -// TransactionManager управляет транзакциями -type TransactionManager struct { - activeTransactions sync.Map // map[TransactionID]*Transaction - nextTxID atomic.Uint64 - wal *WriteAheadLog - logger LoggerInterface - mu sync.RWMutex - walPath string - snapshotInterval int64 // интервал создания снапшотов в секундах - lastSnapshot int64 -} - -// Transaction представляет одну транзакцию -type Transaction struct { - ID TransactionID - State atomic.Int32 - Operations []Operation - StartTime int64 - mu sync.RWMutex -} - -// WriteAheadLog реализует журнал предзаписи -type WriteAheadLog struct { - file *os.File - writeChan chan []byte - done chan struct{} - mu sync.RWMutex - syncInterval time.Duration - lastSync time.Time -} - -var ( - globalTxManager *TransactionManager - txManagerOnce sync.Once - currentTx atomic.Value // *Transaction -) - -// InitTransactionManager инициализирует глобальный менеджер транзакций -func InitTransactionManager(walPath string) error { - var err error - txManagerOnce.Do(func() { - globalTxManager = &TransactionManager{ - nextTxID: atomic.Uint64{}, - walPath: walPath, - snapshotInterval: 300, // 5 минут - lastSnapshot: time.Now().Unix(), - } - globalTxManager.nextTxID.Store(1) - err = globalTxManager.initWAL(walPath) - if err == nil { - // Восстанавливаем состояние из WAL - go globalTxManager.recoverFromWAL() - // Запускаем периодическое создание снапшотов - go globalTxManager.snapshotLoop() - } - }) - return err -} - -// SetTransactionLogger устанавливает логгер для транзакций -func SetTransactionLogger(logger LoggerInterface) { - if globalTxManager != nil { - globalTxManager.logger = logger - } -} - -// initWAL инициализирует Write-Ahead Log -func (tm *TransactionManager) initWAL(walPath string) error { - wal, err := NewWriteAheadLog(walPath) - if err != nil { - return err - } - tm.wal = wal - return nil -} - -// NewWriteAheadLog создаёт новый WAL -func NewWriteAheadLog(path string) (*WriteAheadLog, error) { - file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644) - if err != nil { - return nil, err - } - - wal := &WriteAheadLog{ - file: file, - writeChan: make(chan []byte, 10000), - done: make(chan struct{}), - syncInterval: 5 * time.Second, - lastSync: time.Now(), - } - - go wal.writerLoop() - - return wal, nil -} - -// writerLoop асинхронно записывает данные в WAL -func (wal *WriteAheadLog) writerLoop() { - batch := make([][]byte, 0, 100) - ticker := time.NewTicker(wal.syncInterval) - defer ticker.Stop() - - for { - select { - case data, ok := <-wal.writeChan: - if !ok { - if len(batch) > 0 { - wal.flushBatch(batch) - } - close(wal.done) - return - } - batch = append(batch, data) - if len(batch) >= 100 { - wal.flushBatch(batch) - batch = batch[:0] - } - - case <-ticker.C: - if len(batch) > 0 { - wal.flushBatch(batch) - batch = batch[:0] - } - wal.sync() - } - } -} - -// flushBatch записывает пакет данных в файл -func (wal *WriteAheadLog) flushBatch(batch [][]byte) { - wal.mu.Lock() - defer wal.mu.Unlock() - - for _, data := range batch { - lenBuf := make([]byte, 4) - binary.BigEndian.PutUint32(lenBuf, uint32(len(data))) - - if _, err := wal.file.Write(lenBuf); err != nil { - continue - } - if _, err := wal.file.Write(data); err != nil { - continue - } - } -} - -// sync синхронизирует файл с диском -func (wal *WriteAheadLog) sync() { - wal.mu.Lock() - defer wal.mu.Unlock() - - if time.Since(wal.lastSync) >= wal.syncInterval { - wal.file.Sync() - wal.lastSync = time.Now() - } -} - -// Write записывает запись в WAL -func (wal *WriteAheadLog) Write(record *TransactionRecord) error { - data, err := serializer.Marshal(record) - if err != nil { - return err - } - - select { - case wal.writeChan <- data: - return nil - case <-time.After(5 * time.Second): - return fmt.Errorf("WAL write timeout") - } -} - -// Close закрывает WAL -func (wal *WriteAheadLog) Close() error { - close(wal.writeChan) - <-wal.done - return wal.file.Close() -} - -// ReadAll читает все записи из WAL -func (wal *WriteAheadLog) ReadAll() ([]*TransactionRecord, error) { - wal.mu.RLock() - defer wal.mu.RUnlock() - - if _, err := wal.file.Seek(0, 0); err != nil { - return nil, err - } - - records := make([]*TransactionRecord, 0) - buf := make([]byte, 4) - - for { - _, err := wal.file.Read(buf) - if err != nil { - break - } - - recordLen := binary.BigEndian.Uint32(buf) - recordData := make([]byte, recordLen) - _, err = wal.file.Read(recordData) - if err != nil { - break - } - - var record TransactionRecord - if err := serializer.Unmarshal(recordData, &record); err != nil { - continue - } - records = append(records, &record) - } - - return records, nil -} - -// BeginTransaction начинает новую транзакцию -func BeginTransaction() *Transaction { - if globalTxManager == nil { - InitTransactionManager("futriis.wal") - } - - tx := &Transaction{ - ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1), - StartTime: time.Now().UnixMilli(), - Operations: make([]Operation, 0), - } - tx.State.Store(int32(TransactionActive)) - - globalTxManager.activeTransactions.Store(tx.ID, tx) - currentTx.Store(tx) - - // Записываем начало транзакции в WAL - record := &TransactionRecord{ - ID: tx.ID, - State: TransactionActive, - Timestamp: tx.StartTime, - Operations: tx.Operations, - } - if globalTxManager.wal != nil { - globalTxManager.wal.Write(record) - } - - if globalTxManager.logger != nil { - globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started", tx.ID)) - } - - AuditLog("START", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{ - "start_time": tx.StartTime, - }) - - return tx -} - -// BeginTransactionOnCollection начинает транзакцию для конкретной коллекции -func BeginTransactionOnCollection(coll *Collection) error { - if globalTxManager == nil { - if err := InitTransactionManager("futriis.wal"); err != nil { - return err - } - } - - tx := BeginTransaction() - if tx == nil { - return fmt.Errorf("failed to create transaction") - } - - if globalTxManager.logger != nil { - globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started on collection %s.%s", tx.ID, coll.dbName, coll.name)) - } - - return nil -} - -// AddToTransaction добавляет операцию в текущую транзакцию -func AddToTransaction(coll *Collection, opType string, docID string, newData map[string]interface{}, oldData map[string]interface{}, version uint64) error { - txVal := currentTx.Load() - if txVal == nil { - return fmt.Errorf("no active transaction") - } - - tx := txVal.(*Transaction) - if TransactionState(tx.State.Load()) != TransactionActive { - return fmt.Errorf("transaction is not active") - } - - op := Operation{ - Type: opType, - Database: coll.dbName, - Collection: coll.name, - DocumentID: docID, - Data: newData, - OldData: oldData, - Version: version, - } - - tx.mu.Lock() - tx.Operations = append(tx.Operations, op) - tx.mu.Unlock() - - if globalTxManager.logger != nil { - globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d: added %s operation on %s", tx.ID, opType, docID)) - } - - return nil -} - -// CommitCurrentTransaction коммитит текущую транзакцию -func CommitCurrentTransaction() error { - txVal := currentTx.Load() - if txVal == nil { - return fmt.Errorf("no active transaction") - } - - tx := txVal.(*Transaction) - if TransactionState(tx.State.Load()) != TransactionActive { - return fmt.Errorf("transaction is not active") - } - - // Применяем все операции атомарно - for _, op := range tx.Operations { - if err := applyOperation(op); err != nil { - // Откатываем при ошибке - AbortCurrentTransaction() - return fmt.Errorf("transaction commit failed at operation %s: %v", op.Type, err) - } - } - - tx.State.Store(int32(TransactionCommitted)) - - // Записываем коммит в WAL - record := &TransactionRecord{ - ID: tx.ID, - State: TransactionCommitted, - Timestamp: time.Now().UnixMilli(), - Operations: tx.Operations, - } - if globalTxManager.wal != nil { - globalTxManager.wal.Write(record) - } - - if globalTxManager.logger != nil { - globalTxManager.logger.Info(fmt.Sprintf("Transaction %d committed with %d operations", tx.ID, len(tx.Operations))) - } - - AuditLog("COMMIT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{ - "operations": len(tx.Operations), - }) - - // Очищаем текущую транзакцию - currentTx.Store(nil) - globalTxManager.activeTransactions.Delete(tx.ID) - - return nil -} - -// AbortCurrentTransaction откатывает текущую транзакцию -func AbortCurrentTransaction() error { - txVal := currentTx.Load() - if txVal == nil { - return fmt.Errorf("no active transaction") - } - - tx := txVal.(*Transaction) - tx.State.Store(int32(TransactionAborted)) - - // Записываем откат в WAL - record := &TransactionRecord{ - ID: tx.ID, - State: TransactionAborted, - Timestamp: time.Now().UnixMilli(), - Operations: tx.Operations, - } - if globalTxManager.wal != nil { - globalTxManager.wal.Write(record) - } - - if globalTxManager.logger != nil { - globalTxManager.logger.Info(fmt.Sprintf("Transaction %d aborted with %d operations", tx.ID, len(tx.Operations))) - } - - AuditLog("ABORT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{ - "operations": len(tx.Operations), - }) - - // Очищаем текущую транзакцию - currentTx.Store(nil) - globalTxManager.activeTransactions.Delete(tx.ID) - - return nil -} - -// HasActiveTransaction проверяет наличие активной транзакции -func HasActiveTransaction() bool { - return currentTx.Load() != nil -} - -// GetCurrentTransactionID возвращает ID текущей транзакции -func GetCurrentTransactionID() string { - txVal := currentTx.Load() - if txVal == nil { - return "" - } - tx := txVal.(*Transaction) - return fmt.Sprintf("%d", tx.ID) -} - -// GetActiveTransactions возвращает список активных транзакций для API -func GetActiveTransactions() []TransactionInfo { - if globalTxManager == nil { - return []TransactionInfo{} - } - - transactions := make([]TransactionInfo, 0) - - globalTxManager.activeTransactions.Range(func(key, value interface{}) bool { - tx := value.(*Transaction) - status := "active" - if TransactionState(tx.State.Load()) == TransactionCommitted { - status = "committed" - } else if TransactionState(tx.State.Load()) == TransactionAborted { - status = "aborted" - } - - tx.mu.RLock() - opCount := len(tx.Operations) - operations := make([]OperationInfo, 0, opCount) - for _, op := range tx.Operations { - operations = append(operations, OperationInfo{ - Type: op.Type, - Database: op.Database, - Collection: op.Collection, - DocumentID: op.DocumentID, - }) - } - tx.mu.RUnlock() - - transactions = append(transactions, TransactionInfo{ - ID: fmt.Sprintf("%d", tx.ID), - Status: status, - StartTime: tx.StartTime, - OperationCount: opCount, - Operations: operations, - }) - return true - }) - - return transactions -} - -// GetTransactionByID возвращает транзакцию по ID -func GetTransactionByID(id string) (*Transaction, error) { - if globalTxManager == nil { - return nil, fmt.Errorf("transaction manager not initialized") - } - - var txID TransactionID - fmt.Sscanf(id, "%d", &txID) - - if val, ok := globalTxManager.activeTransactions.Load(txID); ok { - return val.(*Transaction), nil - } - - return nil, fmt.Errorf("transaction not found") -} - -// FindInTransaction ищет документ в контексте транзакции -func FindInTransaction(coll *Collection, id string) (*Document, error) { - txVal := currentTx.Load() - if txVal == nil { - return coll.Find(id) - } - - tx := txVal.(*Transaction) - - tx.mu.RLock() - defer tx.mu.RUnlock() - - // Ищем в операциях транзакции в обратном порядке - for i := len(tx.Operations) - 1; i >= 0; i-- { - op := tx.Operations[i] - if op.DocumentID == id { - if op.Type == "delete" { - return nil, fmt.Errorf("document deleted in transaction") - } - if op.Type == "insert" || op.Type == "update" { - doc := NewDocumentWithID(op.DocumentID) - for k, v := range op.Data { - doc.SetField(k, v) - } - doc.Version = op.Version - return doc, nil - } - } - } - - return coll.Find(id) -} - -// applyOperation применяет операцию к хранилищу -func applyOperation(op Operation) error { - db, err := GetGlobalStorage().GetDatabase(op.Database) - if err != nil { - return fmt.Errorf("database not found: %s", op.Database) - } - - coll, err := db.GetCollection(op.Collection) - if err != nil { - return fmt.Errorf("collection not found: %s", op.Collection) - } - - switch op.Type { - case "insert": - doc := NewDocumentWithID(op.DocumentID) - for k, v := range op.Data { - doc.SetField(k, v) - } - doc.Version = op.Version - return coll.Insert(doc) - - case "update": - return coll.Update(op.DocumentID, op.Data) - - case "delete": - return coll.Delete(op.DocumentID) - } - - return nil -} - -// recoverFromWAL восстанавливает состояние из WAL после сбоя -func (tm *TransactionManager) recoverFromWAL() { - if tm.wal == nil { - return - } - - records, err := tm.wal.ReadAll() - if err != nil { - if tm.logger != nil { - tm.logger.Error(fmt.Sprintf("Failed to read WAL: %v", err)) - } - return - } - - for _, record := range records { - if record.State == TransactionCommitted { - // Повторно применяем закоммиченные операции - for _, op := range record.Operations { - if err := applyOperation(op); err != nil { - if tm.logger != nil { - tm.logger.Error(fmt.Sprintf("Failed to replay operation: %v", err)) - } - } - } - } - // Транзакции в состоянии Active или Aborted игнорируем - } - - if tm.logger != nil { - tm.logger.Info(fmt.Sprintf("Recovered %d transactions from WAL", len(records))) - } -} - -// snapshotLoop периодически создаёт снапшоты -func (tm *TransactionManager) snapshotLoop() { - ticker := time.NewTicker(time.Duration(tm.snapshotInterval) * time.Second) - defer ticker.Stop() - - for range ticker.C { - tm.createSnapshot() - } -} - -// createSnapshot создаёт снапшот текущего состояния -func (tm *TransactionManager) createSnapshot() { - now := time.Now().Unix() - if now-tm.lastSnapshot < tm.snapshotInterval { - return - } - - snapshotPath := fmt.Sprintf("%s.snapshot.%d", tm.walPath, now) - if err := GetGlobalStorage().Backup(snapshotPath); err != nil { - if tm.logger != nil { - tm.logger.Error(fmt.Sprintf("Failed to create snapshot: %v", err)) - } - return - } - - tm.lastSnapshot = now - - if tm.logger != nil { - tm.logger.Info(fmt.Sprintf("Snapshot created: %s", snapshotPath)) - } - - // Удаляем старые снапшоты - go tm.cleanOldSnapshots() -} - -// cleanOldSnapshots удаляет старые снапшоты -func (tm *TransactionManager) cleanOldSnapshots() { - // Реализация удаления старых снапшотов -} - -// GetGlobalStorage возвращает глобальное хранилище (должно быть установлено при инициализации) -var globalStorage *Storage - -// SetGlobalStorage устанавливает глобальное хранилище -func SetGlobalStorage(s *Storage) { - globalStorage = s -} - -// GetGlobalStorage возвращает глобальное хранилище -func GetGlobalStorage() *Storage { - return globalStorage -} - -// AuditLog записывает событие в аудит -func AuditLog(operation, dataType, name string, details map[string]interface{}) { - LogAudit(operation, dataType, name, details) -} - -// MVCCSnapshot создаёт снапшот текущего состояния для MVCC -func MVCCSnapshot() uint64 { - return uint64(time.Now().UnixNano()) -} - -// CreateDocumentVersion создаёт новую версию документа для MVCC -func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion { - return &DocumentVersion{ - Document: doc.Clone(), - Timestamp: time.Now().UnixMilli(), - TxID: txID, - } -}