From 329963d746820a5d37e0e65d2d82c4955bb1d79a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Sun, 17 May 2026 14:32:30 +0000 Subject: [PATCH] Upload files to "internal/storage" --- internal/storage/transaction.go | 931 ++++++++++++++++++++++++++++++++ 1 file changed, 931 insertions(+) create mode 100644 internal/storage/transaction.go diff --git a/internal/storage/transaction.go b/internal/storage/transaction.go new file mode 100644 index 0000000..f03b52f --- /dev/null +++ b/internal/storage/transaction.go @@ -0,0 +1,931 @@ +/* + * Copyright 2026 Safronov Grigorii + * + * Licensed under the CDDL, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * https://opensource.org/licenses/CDDL-1.0 + */ + +// Файл: internal/storage/transaction.go +// Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control) +// и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование. +// Полная WAL реализация на чистом Go без сторонних зависимостей. + +package storage + +import ( + "bufio" + "encoding/binary" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" +) + +// TransactionID представляет уникальный идентификатор транзакции +type TransactionID uint64 + +// TransactionState представляет состояние транзакции +type TransactionState int32 + +const ( + TransactionActive TransactionState = iota + TransactionCommitted + TransactionAborted +) + +// TransactionRecord представляет запись в WAL +type TransactionRecord struct { + ID TransactionID `json:"id"` + State TransactionState `json:"state"` + Timestamp int64 `json:"timestamp"` + Operations []Operation `json:"operations"` +} + +// Operation представляет одну операцию в транзакции +type Operation struct { + Type string `json:"type"` // "insert", "update", "delete" + Database string `json:"database"` + Collection string `json:"collection"` + DocumentID string `json:"document_id"` + Data map[string]interface{} `json:"data"` + Version uint64 `json:"version"` + OldData map[string]interface{} `json:"old_data"` // для отката +} + +// DocumentVersion представляет версию документа для MVCC +type DocumentVersion struct { + Document *Document `json:"document"` + Timestamp int64 `json:"timestamp"` + TxID TransactionID `json:"tx_id"` +} + +// TransactionInfo представляет информацию о транзакции для API +type TransactionInfo struct { + ID string `json:"id"` + Status string `json:"status"` + StartTime int64 `json:"start_time"` + OperationCount int `json:"operation_count"` + Operations []OperationInfo `json:"operations,omitempty"` +} + +// OperationInfo представляет информацию об операции для API +type OperationInfo struct { + Type string `json:"type"` + Database string `json:"database"` + Collection string `json:"collection"` + DocumentID string `json:"document_id"` +} + +// WALRecord представляет запись в WAL файле +type WALRecord struct { + CRC uint32 `json:"crc"` + Length uint32 `json:"length"` + Type byte `json:"type"` // 1=Transaction, 2=Checkpoint + Data []byte `json:"data"` + Timestamp int64 `json:"timestamp"` + LSN uint64 `json:"lsn"` // Log Sequence Number +} + +// WALManager управляет Write-Ahead Log +type WALManager struct { + mu sync.RWMutex + file *os.File + writer *bufio.Writer + path string + currentLSN uint64 + lastSync time.Time + syncInterval time.Duration + bufferSize int + closed bool + writeChan chan *WALRecord + stopChan chan struct{} + wg sync.WaitGroup +} + +// TransactionManager управляет транзакциями +type TransactionManager struct { + activeTransactions sync.Map // map[TransactionID]*Transaction + nextTxID atomic.Uint64 + wal *WALManager + logger LoggerInterface + mu sync.RWMutex + walPath string + checkpointInterval int64 // интервал создания чекпоинтов в секундах + lastCheckpoint int64 + checkpointFile *os.File +} + +// Transaction представляет одну транзакцию +type Transaction struct { + ID TransactionID + State atomic.Int32 + Operations []Operation + StartTime int64 + mu sync.RWMutex +} + +// LoggerInterface определяет интерфейс для логирования +type LoggerInterface interface { + Debug(msg string) + Info(msg string) + Error(msg string) + Warn(msg string) +} + +var ( + globalTxManager *TransactionManager + txManagerOnce sync.Once + currentTx atomic.Value // *Transaction +) + +// CRC32 таблица для быстрых вычислений +var crc32Table = [256]uint32{ + 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f, + 0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988, + 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2, + 0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7, + 0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9, + 0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172, + 0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, 0x35b5a8fa, 0x42b2986c, + 0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59, + 0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423, + 0xcfba9599, 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924, + 0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, 0x01db7106, + 0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433, + 0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d, + 0x91646c97, 0xe6635c01, 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e, + 0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950, + 0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65, + 0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, 0x4adfa541, 0x3dd895d7, + 0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0, + 0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa, + 0xbe0b1010, 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f, + 0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, 0x2eb40d81, + 0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a, + 0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, 0xe3630b12, 0x94643b84, + 0x0d6d6a3e, 0x7a6a5aa8, 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1, + 0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb, + 0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc, + 0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, 0xd6d6a3e8, 0xa1d1937e, + 0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b, + 0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55, + 0x316e8eef, 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236, + 0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, 0xb2bd0b28, + 0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d, + 0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, 0x9c0906a9, 0xeb0e363f, + 0x72076785, 0x05005713, 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38, + 0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242, + 0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777, + 0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, 0x8f659eff, 0xf862ae69, + 0x616bffd3, 0x166ccf45, 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2, + 0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc, + 0x40df0b66, 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9, + 0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, 0xcdd70693, + 0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94, + 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d, +} + +// crc32 вычисляет CRC32 хеш +func crc32(data []byte) uint32 { + crc := uint32(0xFFFFFFFF) + for _, b := range data { + crc = (crc >> 8) ^ crc32Table[(crc^uint32(b))&0xFF] + } + return crc ^ 0xFFFFFFFF +} + +// NewWALManager создаёт новый WAL менеджер +func NewWALManager(path string) (*WALManager, error) { + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("failed to create WAL directory: %v", err) + } + + file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644) + if err != nil { + return nil, fmt.Errorf("failed to open WAL file: %v", err) + } + + // Определяем текущий LSN из файла + var currentLSN uint64 = 1 + stat, err := file.Stat() + if err == nil && stat.Size() > 0 { + // Читаем последнюю запись для определения LSN + currentLSN = uint64(stat.Size()) / 100 // приблизительное значение + if currentLSN < 1 { + currentLSN = 1 + } + } + + wm := &WALManager{ + file: file, + writer: bufio.NewWriterSize(file, 64*1024), // 64KB буфер + path: path, + currentLSN: currentLSN, + lastSync: time.Now(), + syncInterval: 5 * time.Second, + bufferSize: 64 * 1024, + writeChan: make(chan *WALRecord, 10000), + stopChan: make(chan struct{}), + } + + // Запускаем фоновую запись + wm.wg.Add(1) + go wm.writerLoop() + + return wm, nil +} + +// writerLoop асинхронно записывает записи в WAL +func (wm *WALManager) writerLoop() { + defer wm.wg.Done() + + batch := make([]*WALRecord, 0, 100) + ticker := time.NewTicker(wm.syncInterval) + defer ticker.Stop() + + for { + select { + case record, ok := <-wm.writeChan: + if !ok { + // Канал закрыт, сбрасываем оставшиеся записи + if len(batch) > 0 { + wm.flushBatch(batch) + } + return + } + batch = append(batch, record) + if len(batch) >= 100 { + wm.flushBatch(batch) + batch = batch[:0] + } + + case <-ticker.C: + if len(batch) > 0 { + wm.flushBatch(batch) + batch = batch[:0] + } + wm.sync() + + case <-wm.stopChan: + if len(batch) > 0 { + wm.flushBatch(batch) + } + wm.sync() + return + } + } +} + +// flushBatch записывает пакет записей +func (wm *WALManager) flushBatch(batch []*WALRecord) { + wm.mu.Lock() + defer wm.mu.Unlock() + + for _, record := range batch { + // Сериализуем запись + data, err := json.Marshal(record) + if err != nil { + continue + } + + // Записываем размер (4 байта) + lenBuf := make([]byte, 4) + binary.BigEndian.PutUint32(lenBuf, uint32(len(data))) + if _, err := wm.writer.Write(lenBuf); err != nil { + continue + } + + // Записываем данные + if _, err := wm.writer.Write(data); err != nil { + continue + } + + // Обновляем LSN + record.LSN = wm.currentLSN + wm.currentLSN++ + } +} + +// sync синхронизирует буфер с диском +func (wm *WALManager) sync() { + wm.mu.Lock() + defer wm.mu.Unlock() + + if time.Since(wm.lastSync) >= wm.syncInterval { + wm.writer.Flush() + wm.file.Sync() + wm.lastSync = time.Now() + } +} + +// Write записывает запись в WAL +func (wm *WALManager) Write(record *WALRecord) error { + if wm.closed { + return fmt.Errorf("WAL is closed") + } + + record.Timestamp = time.Now().UnixMilli() + + // Вычисляем CRC + data, err := json.Marshal(record.Data) + if err != nil { + return err + } + record.CRC = crc32(data) + + select { + case wm.writeChan <- record: + return nil + case <-time.After(5 * time.Second): + return fmt.Errorf("WAL write timeout") + } +} + +// ReadAll читает все записи из WAL +func (wm *WALManager) ReadAll() ([]*WALRecord, error) { + wm.mu.RLock() + defer wm.mu.RUnlock() + + // Сбрасываем буферы + wm.writer.Flush() + + // Открываем файл для чтения + file, err := os.Open(wm.path) + if err != nil { + return nil, err + } + defer file.Close() + + records := make([]*WALRecord, 0) + reader := bufio.NewReader(file) + lenBuf := make([]byte, 4) + + for { + _, err := reader.Read(lenBuf) + if err != nil { + break + } + + recordLen := binary.BigEndian.Uint32(lenBuf) + recordData := make([]byte, recordLen) + _, err = reader.Read(recordData) + if err != nil { + break + } + + var record WALRecord + if err := json.Unmarshal(recordData, &record); err != nil { + continue + } + + // Проверяем CRC + data, _ := json.Marshal(record.Data) + if crc32(data) != record.CRC { + continue // Пропускаем повреждённые записи + } + + records = append(records, &record) + } + + return records, nil +} + +// Close закрывает WAL +func (wm *WALManager) Close() error { + wm.mu.Lock() + wm.closed = true + wm.mu.Unlock() + + close(wm.stopChan) + close(wm.writeChan) + wm.wg.Wait() + + wm.mu.Lock() + defer wm.mu.Unlock() + + if err := wm.writer.Flush(); err != nil { + return err + } + return wm.file.Close() +} + +// InitTransactionManager инициализирует глобальный менеджер транзакций +func InitTransactionManager(walPath string) error { + var err error + txManagerOnce.Do(func() { + globalTxManager = &TransactionManager{ + nextTxID: atomic.Uint64{}, + walPath: walPath, + checkpointInterval: 300, // 5 минут + lastCheckpoint: time.Now().Unix(), + } + globalTxManager.nextTxID.Store(1) + err = globalTxManager.initWAL(walPath) + if err == nil { + // Восстанавливаем состояние из WAL + go globalTxManager.recoverFromWAL() + // Запускаем периодическое создание чекпоинтов + go globalTxManager.checkpointLoop() + } + }) + return err +} + +// initWAL инициализирует Write-Ahead Log +func (tm *TransactionManager) initWAL(walPath string) error { + wal, err := NewWALManager(walPath) + if err != nil { + return err + } + tm.wal = wal + return nil +} + +// checkpointLoop периодически создаёт чекпоинты +func (tm *TransactionManager) checkpointLoop() { + ticker := time.NewTicker(time.Duration(tm.checkpointInterval) * time.Second) + defer ticker.Stop() + + for range ticker.C { + tm.createCheckpoint() + } +} + +// createCheckpoint создаёт чекпоинт текущего состояния +func (tm *TransactionManager) createCheckpoint() { + now := time.Now().Unix() + if now-tm.lastCheckpoint < tm.checkpointInterval { + return + } + + // Создаём чекпоинт-файл + checkpointPath := fmt.Sprintf("%s.checkpoint.%d", tm.walPath, now) + + // Собираем состояние всех активных транзакций + checkpoint := make(map[string]interface{}) + checkpoint["timestamp"] = now + checkpoint["last_lsn"] = tm.wal.currentLSN + + transactions := make([]*Transaction, 0) + tm.activeTransactions.Range(func(key, value interface{}) bool { + tx := value.(*Transaction) + transactions = append(transactions, tx) + return true + }) + checkpoint["transactions"] = transactions + + data, err := json.Marshal(checkpoint) + if err != nil { + if tm.logger != nil { + tm.logger.Error(fmt.Sprintf("Failed to marshal checkpoint: %v", err)) + } + return + } + + // Записываем чекпоинт + if err := os.WriteFile(checkpointPath, data, 0644); err != nil { + if tm.logger != nil { + tm.logger.Error(fmt.Sprintf("Failed to write checkpoint: %v", err)) + } + return + } + + tm.lastCheckpoint = now + + if tm.logger != nil { + tm.logger.Info(fmt.Sprintf("Checkpoint created: %s", checkpointPath)) + } + + // Удаляем старые чекпоинты + go tm.cleanOldCheckpoints() +} + +// cleanOldCheckpoints удаляет старые чекпоинты +func (tm *TransactionManager) cleanOldCheckpoints() { + files, err := filepath.Glob(fmt.Sprintf("%s.checkpoint.*", tm.walPath)) + if err != nil { + return + } + + // Оставляем только 5 последних чекпоинтов + if len(files) > 5 { + for i := 0; i < len(files)-5; i++ { + os.Remove(files[i]) + } + } +} + +// recoverFromWAL восстанавливает состояние из WAL после сбоя +func (tm *TransactionManager) recoverFromWAL() { + if tm.wal == nil { + return + } + + records, err := tm.wal.ReadAll() + if err != nil { + if tm.logger != nil { + tm.logger.Error(fmt.Sprintf("Failed to read WAL: %v", err)) + } + return + } + + for _, record := range records { + if record.Type == 1 { // Transaction record + var txRecord TransactionRecord + if err := json.Unmarshal(record.Data, &txRecord); err != nil { + continue + } + + if txRecord.State == TransactionCommitted { + // Повторно применяем закоммиченные операции + for _, op := range txRecord.Operations { + if err := applyOperation(op); err != nil { + if tm.logger != nil { + tm.logger.Error(fmt.Sprintf("Failed to replay operation: %v", err)) + } + } + } + } + // Транзакции в состоянии Active или Aborted игнорируем + } + } + + if tm.logger != nil { + tm.logger.Info(fmt.Sprintf("Recovered %d transactions from WAL", len(records))) + } +} + +// SetTransactionLogger устанавливает логгер для транзакций +func SetTransactionLogger(logger LoggerInterface) { + if globalTxManager != nil { + globalTxManager.logger = logger + } +} + +// BeginTransaction начинает новую транзакцию +func BeginTransaction() *Transaction { + if globalTxManager == nil { + InitTransactionManager("futriis.wal") + } + + tx := &Transaction{ + ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1), + StartTime: time.Now().UnixMilli(), + Operations: make([]Operation, 0), + } + tx.State.Store(int32(TransactionActive)) + + globalTxManager.activeTransactions.Store(tx.ID, tx) + currentTx.Store(tx) + + // Записываем начало транзакции в WAL + txData, _ := json.Marshal(TransactionRecord{ + ID: tx.ID, + State: TransactionActive, + Timestamp: tx.StartTime, + Operations: tx.Operations, + }) + + if globalTxManager.wal != nil { + globalTxManager.wal.Write(&WALRecord{ + Type: 1, + Data: txData, + LSN: globalTxManager.wal.currentLSN, + }) + } + + if globalTxManager.logger != nil { + globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started", tx.ID)) + } + + AuditLog("START", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{ + "start_time": tx.StartTime, + }) + + return tx +} + +// BeginTransactionOnCollection начинает транзакцию для конкретной коллекции +func BeginTransactionOnCollection(coll *Collection) error { + if globalTxManager == nil { + if err := InitTransactionManager("futriis.wal"); err != nil { + return err + } + } + + tx := BeginTransaction() + if tx == nil { + return fmt.Errorf("failed to create transaction") + } + + if globalTxManager.logger != nil { + globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started on collection %s.%s", tx.ID, coll.dbName, coll.name)) + } + + return nil +} + +// AddToTransaction добавляет операцию в текущую транзакцию +func AddToTransaction(coll *Collection, opType string, docID string, newData map[string]interface{}, oldData map[string]interface{}, version uint64) error { + txVal := currentTx.Load() + if txVal == nil { + return fmt.Errorf("no active transaction") + } + + tx := txVal.(*Transaction) + if TransactionState(tx.State.Load()) != TransactionActive { + return fmt.Errorf("transaction is not active") + } + + op := Operation{ + Type: opType, + Database: coll.dbName, + Collection: coll.name, + DocumentID: docID, + Data: newData, + OldData: oldData, + Version: version, + } + + tx.mu.Lock() + tx.Operations = append(tx.Operations, op) + tx.mu.Unlock() + + if globalTxManager.logger != nil { + globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d: added %s operation on %s", tx.ID, opType, docID)) + } + + return nil +} + +// CommitCurrentTransaction коммитит текущую транзакцию +func CommitCurrentTransaction() error { + txVal := currentTx.Load() + if txVal == nil { + return fmt.Errorf("no active transaction") + } + + tx := txVal.(*Transaction) + if TransactionState(tx.State.Load()) != TransactionActive { + return fmt.Errorf("transaction is not active") + } + + // Применяем все операции атомарно + for _, op := range tx.Operations { + if err := applyOperation(op); err != nil { + // Откатываем при ошибке + AbortCurrentTransaction() + return fmt.Errorf("transaction commit failed at operation %s: %v", op.Type, err) + } + } + + tx.State.Store(int32(TransactionCommitted)) + + // Записываем коммит в WAL + txData, _ := json.Marshal(TransactionRecord{ + ID: tx.ID, + State: TransactionCommitted, + Timestamp: time.Now().UnixMilli(), + Operations: tx.Operations, + }) + + if globalTxManager.wal != nil { + globalTxManager.wal.Write(&WALRecord{ + Type: 1, + Data: txData, + LSN: globalTxManager.wal.currentLSN, + }) + } + + if globalTxManager.logger != nil { + globalTxManager.logger.Info(fmt.Sprintf("Transaction %d committed with %d operations", tx.ID, len(tx.Operations))) + } + + AuditLog("COMMIT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{ + "operations": len(tx.Operations), + }) + + // Очищаем текущую транзакцию + currentTx.Store(nil) + globalTxManager.activeTransactions.Delete(tx.ID) + + return nil +} + +// AbortCurrentTransaction откатывает текущую транзакцию +func AbortCurrentTransaction() error { + txVal := currentTx.Load() + if txVal == nil { + return fmt.Errorf("no active transaction") + } + + tx := txVal.(*Transaction) + tx.State.Store(int32(TransactionAborted)) + + // Записываем откат в WAL + txData, _ := json.Marshal(TransactionRecord{ + ID: tx.ID, + State: TransactionAborted, + Timestamp: time.Now().UnixMilli(), + Operations: tx.Operations, + }) + + if globalTxManager.wal != nil { + globalTxManager.wal.Write(&WALRecord{ + Type: 1, + Data: txData, + LSN: globalTxManager.wal.currentLSN, + }) + } + + if globalTxManager.logger != nil { + globalTxManager.logger.Info(fmt.Sprintf("Transaction %d aborted with %d operations", tx.ID, len(tx.Operations))) + } + + AuditLog("ABORT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{ + "operations": len(tx.Operations), + }) + + // Очищаем текущую транзакцию + currentTx.Store(nil) + globalTxManager.activeTransactions.Delete(tx.ID) + + return nil +} + +// HasActiveTransaction проверяет наличие активной транзакции +func HasActiveTransaction() bool { + return currentTx.Load() != nil +} + +// GetCurrentTransactionID возвращает ID текущей транзакции +func GetCurrentTransactionID() string { + txVal := currentTx.Load() + if txVal == nil { + return "" + } + tx := txVal.(*Transaction) + return fmt.Sprintf("%d", tx.ID) +} + +// GetActiveTransactions возвращает список активных транзакций для API +func GetActiveTransactions() []TransactionInfo { + if globalTxManager == nil { + return []TransactionInfo{} + } + + transactions := make([]TransactionInfo, 0) + + globalTxManager.activeTransactions.Range(func(key, value interface{}) bool { + tx := value.(*Transaction) + status := "active" + if TransactionState(tx.State.Load()) == TransactionCommitted { + status = "committed" + } else if TransactionState(tx.State.Load()) == TransactionAborted { + status = "aborted" + } + + tx.mu.RLock() + opCount := len(tx.Operations) + operations := make([]OperationInfo, 0, opCount) + for _, op := range tx.Operations { + operations = append(operations, OperationInfo{ + Type: op.Type, + Database: op.Database, + Collection: op.Collection, + DocumentID: op.DocumentID, + }) + } + tx.mu.RUnlock() + + transactions = append(transactions, TransactionInfo{ + ID: fmt.Sprintf("%d", tx.ID), + Status: status, + StartTime: tx.StartTime, + OperationCount: opCount, + Operations: operations, + }) + return true + }) + + return transactions +} + +// GetTransactionByID возвращает транзакцию по ID +func GetTransactionByID(id string) (*Transaction, error) { + if globalTxManager == nil { + return nil, fmt.Errorf("transaction manager not initialized") + } + + var txID TransactionID + fmt.Sscanf(id, "%d", &txID) + + if val, ok := globalTxManager.activeTransactions.Load(txID); ok { + return val.(*Transaction), nil + } + + return nil, fmt.Errorf("transaction not found") +} + +// FindInTransaction ищет документ в контексте транзакции +func FindInTransaction(coll *Collection, id string) (*Document, error) { + txVal := currentTx.Load() + if txVal == nil { + return coll.Find(id) + } + + tx := txVal.(*Transaction) + + tx.mu.RLock() + defer tx.mu.RUnlock() + + // Ищем в операциях транзакции в обратном порядке + for i := len(tx.Operations) - 1; i >= 0; i-- { + op := tx.Operations[i] + if op.DocumentID == id { + if op.Type == "delete" { + return nil, fmt.Errorf("document deleted in transaction") + } + if op.Type == "insert" || op.Type == "update" { + doc := NewDocumentWithID(op.DocumentID) + for k, v := range op.Data { + doc.SetField(k, v) + } + doc.Version = op.Version + return doc, nil + } + } + } + + return coll.Find(id) +} + +// applyOperation применяет операцию к хранилищу +func applyOperation(op Operation) error { + db, err := GetGlobalStorage().GetDatabase(op.Database) + if err != nil { + return fmt.Errorf("database not found: %s", op.Database) + } + + coll, err := db.GetCollection(op.Collection) + if err != nil { + return fmt.Errorf("collection not found: %s", op.Collection) + } + + switch op.Type { + case "insert": + doc := NewDocumentWithID(op.DocumentID) + for k, v := range op.Data { + doc.SetField(k, v) + } + doc.Version = op.Version + return coll.Insert(doc) + + case "update": + return coll.Update(op.DocumentID, op.Data) + + case "delete": + return coll.Delete(op.DocumentID) + } + + return nil +} + +// GetGlobalStorage возвращает глобальное хранилище +var globalStorage *Storage + +// SetGlobalStorage устанавливает глобальное хранилище +func SetGlobalStorage(s *Storage) { + globalStorage = s +} + +// GetGlobalStorage возвращает глобальное хранилище +func GetGlobalStorage() *Storage { + return globalStorage +} + +// AuditLog записывает событие в аудит +func AuditLog(operation, dataType, name string, details map[string]interface{}) { + LogAudit(operation, dataType, name, details) +} + +// MVCCSnapshot создаёт снапшот текущего состояния для MVCC +func MVCCSnapshot() uint64 { + return uint64(time.Now().UnixNano()) +} + +// CreateDocumentVersion создаёт новую версию документа для MVCC +func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion { + return &DocumentVersion{ + Document: doc.Clone(), + Timestamp: time.Now().UnixMilli(), + TxID: txID, + } +}