/* * Copyright 2026 Safronov Grigorii * * Licensed under the CDDL, Version 1.0 (the "License"); * you may not use this file except in compliance with the License. * * You may obtain a copy of the License at * https://opensource.org/licenses/CDDL-1.0 */ // Файл: internal/storage/transaction.go // Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control) // и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование. // Полная WAL реализация на чистом Go без сторонних зависимостей. package storage import ( "bufio" "encoding/binary" "encoding/json" "fmt" "os" "path/filepath" "sync" "sync/atomic" "time" ) // TransactionID представляет уникальный идентификатор транзакции type TransactionID uint64 // TransactionState представляет состояние транзакции type TransactionState int32 const ( TransactionActive TransactionState = iota TransactionCommitted TransactionAborted ) // TransactionRecord представляет запись в WAL type TransactionRecord struct { ID TransactionID `json:"id"` State TransactionState `json:"state"` Timestamp int64 `json:"timestamp"` Operations []Operation `json:"operations"` } // Operation представляет одну операцию в транзакции type Operation struct { Type string `json:"type"` // "insert", "update", "delete" Database string `json:"database"` Collection string `json:"collection"` DocumentID string `json:"document_id"` Data map[string]interface{} `json:"data"` Version uint64 `json:"version"` OldData map[string]interface{} `json:"old_data"` // для отката } // DocumentVersion представляет версию документа для MVCC type DocumentVersion struct { Document *Document `json:"document"` Timestamp int64 `json:"timestamp"` TxID TransactionID `json:"tx_id"` } // TransactionInfo представляет информацию о транзакции для API type TransactionInfo struct { ID string `json:"id"` Status string `json:"status"` StartTime int64 `json:"start_time"` OperationCount int `json:"operation_count"` Operations []OperationInfo `json:"operations,omitempty"` } // OperationInfo представляет информацию об операции для API type OperationInfo struct { Type string `json:"type"` Database string `json:"database"` Collection string `json:"collection"` DocumentID string `json:"document_id"` } // WALRecord представляет запись в WAL файле type WALRecord struct { CRC uint32 `json:"crc"` Length uint32 `json:"length"` Type byte `json:"type"` // 1=Transaction, 2=Checkpoint Data []byte `json:"data"` Timestamp int64 `json:"timestamp"` LSN uint64 `json:"lsn"` // Log Sequence Number } // WALManager управляет Write-Ahead Log type WALManager struct { mu sync.RWMutex file *os.File writer *bufio.Writer path string currentLSN uint64 lastSync time.Time syncInterval time.Duration bufferSize int closed bool writeChan chan *WALRecord stopChan chan struct{} wg sync.WaitGroup } // TransactionManager управляет транзакциями type TransactionManager struct { activeTransactions sync.Map // map[TransactionID]*Transaction nextTxID atomic.Uint64 wal *WALManager logger LoggerInterface mu sync.RWMutex walPath string checkpointInterval int64 // интервал создания чекпоинтов в секундах lastCheckpoint int64 checkpointFile *os.File } // Transaction представляет одну транзакцию type Transaction struct { ID TransactionID State atomic.Int32 Operations []Operation StartTime int64 mu sync.RWMutex } // LoggerInterface определяет интерфейс для логирования type LoggerInterface interface { Debug(msg string) Info(msg string) Error(msg string) Warn(msg string) } var ( globalTxManager *TransactionManager txManagerOnce sync.Once currentTx atomic.Value // *Transaction ) // CRC32 таблица для быстрых вычислений var crc32Table = [256]uint32{ 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988, 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7, 0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172, 0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, 0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59, 0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924, 0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433, 0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01, 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e, 0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65, 0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, 0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0, 0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f, 0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a, 0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, 0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8, 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1, 0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc, 0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, 0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b, 0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236, 0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d, 0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, 0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713, 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38, 0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777, 0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, 0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45, 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2, 0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9, 0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94, 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d, } // crc32 вычисляет CRC32 хеш func crc32(data []byte) uint32 { crc := uint32(0xFFFFFFFF) for _, b := range data { crc = (crc >> 8) ^ crc32Table[(crc^uint32(b))&0xFF] } return crc ^ 0xFFFFFFFF } // NewWALManager создаёт новый WAL менеджер func NewWALManager(path string) (*WALManager, error) { dir := filepath.Dir(path) if err := os.MkdirAll(dir, 0755); err != nil { return nil, fmt.Errorf("failed to create WAL directory: %v", err) } file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644) if err != nil { return nil, fmt.Errorf("failed to open WAL file: %v", err) } // Определяем текущий LSN из файла var currentLSN uint64 = 1 stat, err := file.Stat() if err == nil && stat.Size() > 0 { // Читаем последнюю запись для определения LSN currentLSN = uint64(stat.Size()) / 100 // приблизительное значение if currentLSN < 1 { currentLSN = 1 } } wm := &WALManager{ file: file, writer: bufio.NewWriterSize(file, 64*1024), // 64KB буфер path: path, currentLSN: currentLSN, lastSync: time.Now(), syncInterval: 5 * time.Second, bufferSize: 64 * 1024, writeChan: make(chan *WALRecord, 10000), stopChan: make(chan struct{}), } // Запускаем фоновую запись wm.wg.Add(1) go wm.writerLoop() return wm, nil } // writerLoop асинхронно записывает записи в WAL func (wm *WALManager) writerLoop() { defer wm.wg.Done() batch := make([]*WALRecord, 0, 100) ticker := time.NewTicker(wm.syncInterval) defer ticker.Stop() for { select { case record, ok := <-wm.writeChan: if !ok { // Канал закрыт, сбрасываем оставшиеся записи if len(batch) > 0 { wm.flushBatch(batch) } return } batch = append(batch, record) if len(batch) >= 100 { wm.flushBatch(batch) batch = batch[:0] } case <-ticker.C: if len(batch) > 0 { wm.flushBatch(batch) batch = batch[:0] } wm.sync() case <-wm.stopChan: if len(batch) > 0 { wm.flushBatch(batch) } wm.sync() return } } } // flushBatch записывает пакет записей func (wm *WALManager) flushBatch(batch []*WALRecord) { wm.mu.Lock() defer wm.mu.Unlock() for _, record := range batch { // Сериализуем запись data, err := json.Marshal(record) if err != nil { continue } // Записываем размер (4 байта) lenBuf := make([]byte, 4) binary.BigEndian.PutUint32(lenBuf, uint32(len(data))) if _, err := wm.writer.Write(lenBuf); err != nil { continue } // Записываем данные if _, err := wm.writer.Write(data); err != nil { continue } // Обновляем LSN record.LSN = wm.currentLSN wm.currentLSN++ } } // sync синхронизирует буфер с диском func (wm *WALManager) sync() { wm.mu.Lock() defer wm.mu.Unlock() if time.Since(wm.lastSync) >= wm.syncInterval { wm.writer.Flush() wm.file.Sync() wm.lastSync = time.Now() } } // Write записывает запись в WAL func (wm *WALManager) Write(record *WALRecord) error { if wm.closed { return fmt.Errorf("WAL is closed") } record.Timestamp = time.Now().UnixMilli() // Вычисляем CRC data, err := json.Marshal(record.Data) if err != nil { return err } record.CRC = crc32(data) select { case wm.writeChan <- record: return nil case <-time.After(5 * time.Second): return fmt.Errorf("WAL write timeout") } } // ReadAll читает все записи из WAL func (wm *WALManager) ReadAll() ([]*WALRecord, error) { wm.mu.RLock() defer wm.mu.RUnlock() // Сбрасываем буферы wm.writer.Flush() // Открываем файл для чтения file, err := os.Open(wm.path) if err != nil { return nil, err } defer file.Close() records := make([]*WALRecord, 0) reader := bufio.NewReader(file) lenBuf := make([]byte, 4) for { _, err := reader.Read(lenBuf) if err != nil { break } recordLen := binary.BigEndian.Uint32(lenBuf) recordData := make([]byte, recordLen) _, err = reader.Read(recordData) if err != nil { break } var record WALRecord if err := json.Unmarshal(recordData, &record); err != nil { continue } // Проверяем CRC data, _ := json.Marshal(record.Data) if crc32(data) != record.CRC { continue // Пропускаем повреждённые записи } records = append(records, &record) } return records, nil } // Close закрывает WAL func (wm *WALManager) Close() error { wm.mu.Lock() wm.closed = true wm.mu.Unlock() close(wm.stopChan) close(wm.writeChan) wm.wg.Wait() wm.mu.Lock() defer wm.mu.Unlock() if err := wm.writer.Flush(); err != nil { return err } return wm.file.Close() } // InitTransactionManager инициализирует глобальный менеджер транзакций func InitTransactionManager(walPath string) error { var err error txManagerOnce.Do(func() { globalTxManager = &TransactionManager{ nextTxID: atomic.Uint64{}, walPath: walPath, checkpointInterval: 300, // 5 минут lastCheckpoint: time.Now().Unix(), } globalTxManager.nextTxID.Store(1) err = globalTxManager.initWAL(walPath) if err == nil { // Восстанавливаем состояние из WAL go globalTxManager.recoverFromWAL() // Запускаем периодическое создание чекпоинтов go globalTxManager.checkpointLoop() } }) return err } // initWAL инициализирует Write-Ahead Log func (tm *TransactionManager) initWAL(walPath string) error { wal, err := NewWALManager(walPath) if err != nil { return err } tm.wal = wal return nil } // checkpointLoop периодически создаёт чекпоинты func (tm *TransactionManager) checkpointLoop() { ticker := time.NewTicker(time.Duration(tm.checkpointInterval) * time.Second) defer ticker.Stop() for range ticker.C { tm.createCheckpoint() } } // createCheckpoint создаёт чекпоинт текущего состояния func (tm *TransactionManager) createCheckpoint() { now := time.Now().Unix() if now-tm.lastCheckpoint < tm.checkpointInterval { return } // Создаём чекпоинт-файл checkpointPath := fmt.Sprintf("%s.checkpoint.%d", tm.walPath, now) // Собираем состояние всех активных транзакций checkpoint := make(map[string]interface{}) checkpoint["timestamp"] = now checkpoint["last_lsn"] = tm.wal.currentLSN transactions := make([]*Transaction, 0) tm.activeTransactions.Range(func(key, value interface{}) bool { tx := value.(*Transaction) transactions = append(transactions, tx) return true }) checkpoint["transactions"] = transactions data, err := json.Marshal(checkpoint) if err != nil { if tm.logger != nil { tm.logger.Error(fmt.Sprintf("Failed to marshal checkpoint: %v", err)) } return } // Записываем чекпоинт if err := os.WriteFile(checkpointPath, data, 0644); err != nil { if tm.logger != nil { tm.logger.Error(fmt.Sprintf("Failed to write checkpoint: %v", err)) } return } tm.lastCheckpoint = now if tm.logger != nil { tm.logger.Info(fmt.Sprintf("Checkpoint created: %s", checkpointPath)) } // Удаляем старые чекпоинты go tm.cleanOldCheckpoints() } // cleanOldCheckpoints удаляет старые чекпоинты func (tm *TransactionManager) cleanOldCheckpoints() { files, err := filepath.Glob(fmt.Sprintf("%s.checkpoint.*", tm.walPath)) if err != nil { return } // Оставляем только 5 последних чекпоинтов if len(files) > 5 { for i := 0; i < len(files)-5; i++ { os.Remove(files[i]) } } } // recoverFromWAL восстанавливает состояние из WAL после сбоя func (tm *TransactionManager) recoverFromWAL() { if tm.wal == nil { return } records, err := tm.wal.ReadAll() if err != nil { if tm.logger != nil { tm.logger.Error(fmt.Sprintf("Failed to read WAL: %v", err)) } return } for _, record := range records { if record.Type == 1 { // Transaction record var txRecord TransactionRecord if err := json.Unmarshal(record.Data, &txRecord); err != nil { continue } if txRecord.State == TransactionCommitted { // Повторно применяем закоммиченные операции for _, op := range txRecord.Operations { if err := applyOperation(op); err != nil { if tm.logger != nil { tm.logger.Error(fmt.Sprintf("Failed to replay operation: %v", err)) } } } } // Транзакции в состоянии Active или Aborted игнорируем } } if tm.logger != nil { tm.logger.Info(fmt.Sprintf("Recovered %d transactions from WAL", len(records))) } } // SetTransactionLogger устанавливает логгер для транзакций func SetTransactionLogger(logger LoggerInterface) { if globalTxManager != nil { globalTxManager.logger = logger } } // BeginTransaction начинает новую транзакцию func BeginTransaction() *Transaction { if globalTxManager == nil { InitTransactionManager("futriis.wal") } tx := &Transaction{ ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1), StartTime: time.Now().UnixMilli(), Operations: make([]Operation, 0), } tx.State.Store(int32(TransactionActive)) globalTxManager.activeTransactions.Store(tx.ID, tx) currentTx.Store(tx) // Записываем начало транзакции в WAL txData, _ := json.Marshal(TransactionRecord{ ID: tx.ID, State: TransactionActive, Timestamp: tx.StartTime, Operations: tx.Operations, }) if globalTxManager.wal != nil { globalTxManager.wal.Write(&WALRecord{ Type: 1, Data: txData, LSN: globalTxManager.wal.currentLSN, }) } if globalTxManager.logger != nil { globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started", tx.ID)) } AuditLog("START", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{ "start_time": tx.StartTime, }) return tx } // BeginTransactionOnCollection начинает транзакцию для конкретной коллекции func BeginTransactionOnCollection(coll *Collection) error { if globalTxManager == nil { if err := InitTransactionManager("futriis.wal"); err != nil { return err } } tx := BeginTransaction() if tx == nil { return fmt.Errorf("failed to create transaction") } if globalTxManager.logger != nil { globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started on collection %s.%s", tx.ID, coll.dbName, coll.name)) } return nil } // AddToTransaction добавляет операцию в текущую транзакцию func AddToTransaction(coll *Collection, opType string, docID string, newData map[string]interface{}, oldData map[string]interface{}, version uint64) error { txVal := currentTx.Load() if txVal == nil { return fmt.Errorf("no active transaction") } tx := txVal.(*Transaction) if TransactionState(tx.State.Load()) != TransactionActive { return fmt.Errorf("transaction is not active") } op := Operation{ Type: opType, Database: coll.dbName, Collection: coll.name, DocumentID: docID, Data: newData, OldData: oldData, Version: version, } tx.mu.Lock() tx.Operations = append(tx.Operations, op) tx.mu.Unlock() if globalTxManager.logger != nil { globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d: added %s operation on %s", tx.ID, opType, docID)) } return nil } // CommitCurrentTransaction коммитит текущую транзакцию func CommitCurrentTransaction() error { txVal := currentTx.Load() if txVal == nil { return fmt.Errorf("no active transaction") } tx := txVal.(*Transaction) if TransactionState(tx.State.Load()) != TransactionActive { return fmt.Errorf("transaction is not active") } // Применяем все операции атомарно for _, op := range tx.Operations { if err := applyOperation(op); err != nil { // Откатываем при ошибке AbortCurrentTransaction() return fmt.Errorf("transaction commit failed at operation %s: %v", op.Type, err) } } tx.State.Store(int32(TransactionCommitted)) // Записываем коммит в WAL txData, _ := json.Marshal(TransactionRecord{ ID: tx.ID, State: TransactionCommitted, Timestamp: time.Now().UnixMilli(), Operations: tx.Operations, }) if globalTxManager.wal != nil { globalTxManager.wal.Write(&WALRecord{ Type: 1, Data: txData, LSN: globalTxManager.wal.currentLSN, }) } if globalTxManager.logger != nil { globalTxManager.logger.Info(fmt.Sprintf("Transaction %d committed with %d operations", tx.ID, len(tx.Operations))) } AuditLog("COMMIT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{ "operations": len(tx.Operations), }) // Очищаем текущую транзакцию currentTx.Store(nil) globalTxManager.activeTransactions.Delete(tx.ID) return nil } // AbortCurrentTransaction откатывает текущую транзакцию func AbortCurrentTransaction() error { txVal := currentTx.Load() if txVal == nil { return fmt.Errorf("no active transaction") } tx := txVal.(*Transaction) tx.State.Store(int32(TransactionAborted)) // Записываем откат в WAL txData, _ := json.Marshal(TransactionRecord{ ID: tx.ID, State: TransactionAborted, Timestamp: time.Now().UnixMilli(), Operations: tx.Operations, }) if globalTxManager.wal != nil { globalTxManager.wal.Write(&WALRecord{ Type: 1, Data: txData, LSN: globalTxManager.wal.currentLSN, }) } if globalTxManager.logger != nil { globalTxManager.logger.Info(fmt.Sprintf("Transaction %d aborted with %d operations", tx.ID, len(tx.Operations))) } AuditLog("ABORT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{ "operations": len(tx.Operations), }) // Очищаем текущую транзакцию currentTx.Store(nil) globalTxManager.activeTransactions.Delete(tx.ID) return nil } // HasActiveTransaction проверяет наличие активной транзакции func HasActiveTransaction() bool { return currentTx.Load() != nil } // GetCurrentTransactionID возвращает ID текущей транзакции func GetCurrentTransactionID() string { txVal := currentTx.Load() if txVal == nil { return "" } tx := txVal.(*Transaction) return fmt.Sprintf("%d", tx.ID) } // GetActiveTransactions возвращает список активных транзакций для API func GetActiveTransactions() []TransactionInfo { if globalTxManager == nil { return []TransactionInfo{} } transactions := make([]TransactionInfo, 0) globalTxManager.activeTransactions.Range(func(key, value interface{}) bool { tx := value.(*Transaction) status := "active" if TransactionState(tx.State.Load()) == TransactionCommitted { status = "committed" } else if TransactionState(tx.State.Load()) == TransactionAborted { status = "aborted" } tx.mu.RLock() opCount := len(tx.Operations) operations := make([]OperationInfo, 0, opCount) for _, op := range tx.Operations { operations = append(operations, OperationInfo{ Type: op.Type, Database: op.Database, Collection: op.Collection, DocumentID: op.DocumentID, }) } tx.mu.RUnlock() transactions = append(transactions, TransactionInfo{ ID: fmt.Sprintf("%d", tx.ID), Status: status, StartTime: tx.StartTime, OperationCount: opCount, Operations: operations, }) return true }) return transactions } // GetTransactionByID возвращает транзакцию по ID func GetTransactionByID(id string) (*Transaction, error) { if globalTxManager == nil { return nil, fmt.Errorf("transaction manager not initialized") } var txID TransactionID fmt.Sscanf(id, "%d", &txID) if val, ok := globalTxManager.activeTransactions.Load(txID); ok { return val.(*Transaction), nil } return nil, fmt.Errorf("transaction not found") } // FindInTransaction ищет документ в контексте транзакции func FindInTransaction(coll *Collection, id string) (*Document, error) { txVal := currentTx.Load() if txVal == nil { return coll.Find(id) } tx := txVal.(*Transaction) tx.mu.RLock() defer tx.mu.RUnlock() // Ищем в операциях транзакции в обратном порядке for i := len(tx.Operations) - 1; i >= 0; i-- { op := tx.Operations[i] if op.DocumentID == id { if op.Type == "delete" { return nil, fmt.Errorf("document deleted in transaction") } if op.Type == "insert" || op.Type == "update" { doc := NewDocumentWithID(op.DocumentID) for k, v := range op.Data { doc.SetField(k, v) } doc.Version = op.Version return doc, nil } } } return coll.Find(id) } // applyOperation применяет операцию к хранилищу func applyOperation(op Operation) error { db, err := GetGlobalStorage().GetDatabase(op.Database) if err != nil { return fmt.Errorf("database not found: %s", op.Database) } coll, err := db.GetCollection(op.Collection) if err != nil { return fmt.Errorf("collection not found: %s", op.Collection) } switch op.Type { case "insert": doc := NewDocumentWithID(op.DocumentID) for k, v := range op.Data { doc.SetField(k, v) } doc.Version = op.Version return coll.Insert(doc) case "update": return coll.Update(op.DocumentID, op.Data) case "delete": return coll.Delete(op.DocumentID) } return nil } // GetGlobalStorage возвращает глобальное хранилище var globalStorage *Storage // SetGlobalStorage устанавливает глобальное хранилище func SetGlobalStorage(s *Storage) { globalStorage = s } // GetGlobalStorage возвращает глобальное хранилище func GetGlobalStorage() *Storage { return globalStorage } // AuditLog записывает событие в аудит func AuditLog(operation, dataType, name string, details map[string]interface{}) { LogAudit(operation, dataType, name, details) } // MVCCSnapshot создаёт снапшот текущего состояния для MVCC func MVCCSnapshot() uint64 { return uint64(time.Now().UnixNano()) } // CreateDocumentVersion создаёт новую версию документа для MVCC func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion { return &DocumentVersion{ Document: doc.Clone(), Timestamp: time.Now().UnixMilli(), TxID: txID, } }