From 8d10e2617fb040c24b56f2568c972c67aa517579 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Fri, 1 May 2026 17:07:26 +0000 Subject: [PATCH] Upload files to "internal/storage" --- internal/storage/engine.go | 431 +++++++++++++++++++ internal/storage/transaction.go | 722 ++++++++++++++++++++++++++++++++ 2 files changed, 1153 insertions(+) create mode 100644 internal/storage/engine.go create mode 100644 internal/storage/transaction.go diff --git a/internal/storage/engine.go b/internal/storage/engine.go new file mode 100644 index 0000000..6694104 --- /dev/null +++ b/internal/storage/engine.go @@ -0,0 +1,431 @@ +/* + * Copyright 2026 Safronov Grigorii + * + * Licensed under the CDDL, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * https://opensource.org/licenses/CDDL-1.0 + */ + +// Файл: internal/storage/engine.go +// Назначение: In-memory движок хранения документов с поддержкой коллекций, +// слайсов (аналог БД), тапплов (аналог таблиц), полей и кортежей. +// Полностью wait-free с использованием sync.Map и атомарных операций. + +package storage + +import ( + "fmt" + "os" + "sync" + "sync/atomic" + + "futriis/internal/log" + "futriis/internal/serializer" +) + +// Storage представляет основное хранилище баз данных +type Storage struct { + databases sync.Map // map[string]*Database + pageSize int64 + logger *log.Logger + totalDocs atomic.Int64 +} + +// Database представляет базу данных (аналог слайса в реляционных СУБД) +type Database struct { + name string + collections sync.Map // map[string]*Collection +} + +// NewStorage создаёт новый экземпляр хранилища +func NewStorage(pageSizeMB int, logger *log.Logger) *Storage { + return &Storage{ + pageSize: int64(pageSizeMB) * 1024 * 1024, + logger: logger, + } +} + +// CreateDatabase создаёт новую базу данных +func (s *Storage) CreateDatabase(name string) error { + if _, exists := s.databases.LoadOrStore(name, &Database{name: name}); exists { + return fmt.Errorf("database already exists") + } + AuditDatabaseOperation("CREATE", name) + if s.logger != nil { + s.logger.Info("Database created: " + name) + } + return nil +} + +// GetDatabase возвращает базу данных по имени +func (s *Storage) GetDatabase(name string) (*Database, error) { + if val, ok := s.databases.Load(name); ok { + return val.(*Database), nil + } + return nil, fmt.Errorf("database not found") +} + +// DropDatabase удаляет базу данных +func (s *Storage) DropDatabase(name string) error { + if _, ok := s.databases.LoadAndDelete(name); !ok { + return fmt.Errorf("database not found") + } + AuditDatabaseOperation("DROP", name) + if s.logger != nil { + s.logger.Info("Database dropped: " + name) + } + return nil +} + +// ListDatabases возвращает список всех баз данных +func (s *Storage) ListDatabases() []string { + databases := make([]string, 0) + s.databases.Range(func(key, value interface{}) bool { + databases = append(databases, key.(string)) + return true + }) + return databases +} + +// Name возвращает имя базы данных +func (db *Database) Name() string { + return db.name +} + +// CreateCollection создаёт новую коллекцию в базе данных +func (db *Database) CreateCollection(name string) error { + if _, exists := db.collections.LoadOrStore(name, NewCollection(db.name, name, nil)); exists { + return fmt.Errorf("collection already exists") + } + AuditCollectionOperation("CREATE", db.name, name, nil) + return nil +} + +// CreateCollectionWithSettings создаёт коллекцию с настройками +func (db *Database) CreateCollectionWithSettings(name string, settings *CollectionSettings) error { + if _, exists := db.collections.LoadOrStore(name, NewCollection(db.name, name, settings)); exists { + return fmt.Errorf("collection already exists") + } + AuditCollectionOperation("CREATE", db.name, name, settings) + return nil +} + +// GetCollection возвращает коллекцию по имени +func (db *Database) GetCollection(name string) (*Collection, error) { + if val, ok := db.collections.Load(name); ok { + return val.(*Collection), nil + } + return nil, fmt.Errorf("collection not found") +} + +// DropCollection удаляет коллекцию +func (db *Database) DropCollection(name string) error { + if _, ok := db.collections.LoadAndDelete(name); !ok { + return fmt.Errorf("collection not found") + } + AuditCollectionOperation("DROP", db.name, name, nil) + return nil +} + +// ListCollections возвращает список всех коллекций в базе данных +func (db *Database) ListCollections() []string { + collections := make([]string, 0) + db.collections.Range(func(key, value interface{}) bool { + collections = append(collections, key.(string)) + return true + }) + return collections +} + +// GetTotalDocuments возвращает общее количество документов во всех коллекциях +func (s *Storage) GetTotalDocuments() int64 { + return s.totalDocs.Load() +} + +// GetPageSize возвращает размер страницы памяти +func (s *Storage) GetPageSize() int64 { + return s.pageSize +} + +// SerializeDatabase сериализует всю базу данных в MessagePack +func (db *Database) SerializeDatabase() ([]byte, error) { + dbData := make(map[string]interface{}) + + db.collections.Range(func(key, value interface{}) bool { + coll := value.(*Collection) + collData := make(map[string]interface{}) + + docs := coll.GetAllDocuments() + collDocs := make([]*Document, 0, len(docs)) + for _, doc := range docs { + collDocs = append(collDocs, doc) + } + collData["documents"] = collDocs + collData["metadata"] = coll.GetMetadata() + + dbData[key.(string)] = collData + return true + }) + + return serializer.Marshal(dbData) +} + +// DeserializeDatabase десериализует базу данных из MessagePack +func (db *Database) DeserializeDatabase(data []byte) error { + var dbData map[string]interface{} + if err := serializer.Unmarshal(data, &dbData); err != nil { + return err + } + + for collName, collDataRaw := range dbData { + collData, ok := collDataRaw.(map[string]interface{}) + if !ok { + continue + } + + settings := &CollectionSettings{ + MaxDocuments: 0, + ValidateSchema: false, + AutoIndexID: true, + TTLSeconds: 0, + } + + if metaRaw, ok := collData["metadata"]; ok { + if meta, ok := metaRaw.(map[string]interface{}); ok { + if settingsRaw, ok := meta["settings"]; ok { + if settingsMap, ok := settingsRaw.(map[string]interface{}); ok { + if maxDocs, ok := settingsMap["max_documents"]; ok { + if v, ok := maxDocs.(int); ok { + settings.MaxDocuments = v + } else if v, ok := maxDocs.(int64); ok { + settings.MaxDocuments = int(v) + } else if v, ok := maxDocs.(float64); ok { + settings.MaxDocuments = int(v) + } + } + if validateSchema, ok := settingsMap["validate_schema"]; ok { + if v, ok := validateSchema.(bool); ok { + settings.ValidateSchema = v + } + } + if autoIndexID, ok := settingsMap["auto_index_id"]; ok { + if v, ok := autoIndexID.(bool); ok { + settings.AutoIndexID = v + } + } + if ttlSeconds, ok := settingsMap["ttl_seconds"]; ok { + if v, ok := ttlSeconds.(int); ok { + settings.TTLSeconds = v + } else if v, ok := ttlSeconds.(int64); ok { + settings.TTLSeconds = int(v) + } else if v, ok := ttlSeconds.(float64); ok { + settings.TTLSeconds = int(v) + } + } + } + } + } else if meta, ok := metaRaw.(*CollectionMetadata); ok { + if meta.Settings != nil { + settings = meta.Settings + } + } + } + + coll := NewCollection(db.name, collName, settings) + + if docsRaw, ok := collData["documents"]; ok { + if docs, ok := docsRaw.([]interface{}); ok { + for _, docRaw := range docs { + if doc, ok := docRaw.(*Document); ok { + coll.Insert(doc) + } else if docMap, ok := docRaw.(map[string]interface{}); ok { + doc := NewDocument() + if id, ok := docMap["ID"].(string); ok { + doc.ID = id + } else if id, ok := docMap["id"].(string); ok { + doc.ID = id + } + if fields, ok := docMap["fields"]; ok { + if fieldsMap, ok := fields.(map[string]interface{}); ok { + for k, v := range fieldsMap { + doc.SetField(k, v) + } + } + } + if createdAt, ok := docMap["created_at"]; ok { + if v, ok := createdAt.(int64); ok { + doc.CreatedAt = v + } else if v, ok := createdAt.(float64); ok { + doc.CreatedAt = int64(v) + } + } + if updatedAt, ok := docMap["updated_at"]; ok { + if v, ok := updatedAt.(int64); ok { + doc.UpdatedAt = v + } else if v, ok := updatedAt.(float64); ok { + doc.UpdatedAt = int64(v) + } + } + if version, ok := docMap["version"]; ok { + if v, ok := version.(uint64); ok { + doc.Version = v + } else if v, ok := version.(int64); ok { + doc.Version = uint64(v) + } else if v, ok := version.(float64); ok { + doc.Version = uint64(v) + } + } + coll.Insert(doc) + } + } + } else if docs, ok := docsRaw.([]*Document); ok { + for _, doc := range docs { + coll.Insert(doc) + } + } + } + + db.collections.Store(collName, coll) + AuditCollectionOperation("RESTORE", db.name, collName, settings) + } + + return nil +} + +// GetDatabaseNames возвращает имена всех баз данных +func (s *Storage) GetDatabaseNames() []string { + return s.ListDatabases() +} + +// ExistsDatabase проверяет существование базы данных +func (s *Storage) ExistsDatabase(name string) bool { + _, ok := s.databases.Load(name) + return ok +} + +// GetDatabaseCount возвращает количество баз данных +func (s *Storage) GetDatabaseCount() int { + count := 0 + s.databases.Range(func(key, value interface{}) bool { + count++ + return true + }) + return count +} + +// ========== Дополнительные методы для управления хранилищем ========== + +// Backup создаёт резервную копию всех данных +func (s *Storage) Backup(backupPath string) error { + if s.logger != nil { + s.logger.Info(fmt.Sprintf("Starting backup to %s", backupPath)) + } + + backup := make(map[string]interface{}) + + s.databases.Range(func(key, value interface{}) bool { + dbName := key.(string) + db := value.(*Database) + + dbData, err := db.SerializeDatabase() + if err != nil { + if s.logger != nil { + s.logger.Error(fmt.Sprintf("Failed to serialize database %s: %v", dbName, err)) + } + return false + } + + backup[dbName] = dbData + return true + }) + + data, err := serializer.Marshal(backup) + if err != nil { + return fmt.Errorf("failed to marshal backup: %v", err) + } + + if err := os.WriteFile(backupPath, data, 0644); err != nil { + return fmt.Errorf("failed to write backup: %v", err) + } + + if s.logger != nil { + s.logger.Info(fmt.Sprintf("Backup completed: %s", backupPath)) + } + return nil +} + +// Restore восстанавливает данные из резервной копии +func (s *Storage) Restore(backupPath string) error { + if s.logger != nil { + s.logger.Info(fmt.Sprintf("Starting restore from %s", backupPath)) + } + + data, err := os.ReadFile(backupPath) + if err != nil { + return fmt.Errorf("failed to read backup file: %v", err) + } + + var backup map[string][]byte + if err := serializer.Unmarshal(data, &backup); err != nil { + return fmt.Errorf("failed to unmarshal backup: %v", err) + } + + for dbName, dbData := range backup { + if !s.ExistsDatabase(dbName) { + if err := s.CreateDatabase(dbName); err != nil { + return fmt.Errorf("failed to create database %s: %v", dbName, err) + } + } + + db, err := s.GetDatabase(dbName) + if err != nil { + return fmt.Errorf("failed to get database %s: %v", dbName, err) + } + + if err := db.DeserializeDatabase(dbData); err != nil { + return fmt.Errorf("failed to restore database %s: %v", dbName, err) + } + } + + if s.logger != nil { + s.logger.Info("Restore completed") + } + return nil +} + +// GetStats возвращает статистику хранилища +func (s *Storage) GetStats() map[string]interface{} { + stats := map[string]interface{}{ + "total_databases": s.GetDatabaseCount(), + "total_documents": s.GetTotalDocuments(), + "page_size_bytes": s.pageSize, + } + + databases := make([]map[string]interface{}, 0) + s.databases.Range(func(key, value interface{}) bool { + db := value.(*Database) + dbStats := map[string]interface{}{ + "name": db.name, + "collections": len(db.ListCollections()), + } + + totalDocs := int64(0) + totalSize := int64(0) + db.collections.Range(func(k, v interface{}) bool { + coll := v.(*Collection) + totalDocs += coll.Count() + totalSize += coll.Size() + return true + }) + + dbStats["documents"] = totalDocs + dbStats["size_bytes"] = totalSize + databases = append(databases, dbStats) + return true + }) + + stats["databases"] = databases + return stats +} diff --git a/internal/storage/transaction.go b/internal/storage/transaction.go new file mode 100644 index 0000000..154dc99 --- /dev/null +++ b/internal/storage/transaction.go @@ -0,0 +1,722 @@ +/* + * Copyright 2026 Safronov Grigorii + * + * Licensed under the CDDL, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * https://opensource.org/licenses/CDDL-1.0 + */ + +// Файл: internal/storage/transaction.go +// Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control) +// и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование. + +package storage + +import ( + "encoding/binary" + "fmt" + "os" + "sync" + "sync/atomic" + "time" + + "futriis/internal/serializer" +) + +// TransactionID представляет уникальный идентификатор транзакции +type TransactionID uint64 + +// TransactionState представляет состояние транзакции +type TransactionState int32 + +const ( + TransactionActive TransactionState = iota + TransactionCommitted + TransactionAborted +) + +// TransactionRecord представляет запись в WAL +type TransactionRecord struct { + ID TransactionID `msgpack:"id"` + State TransactionState `msgpack:"state"` + Timestamp int64 `msgpack:"timestamp"` + Operations []Operation `msgpack:"operations"` +} + +// Operation представляет одну операцию в транзакции +type Operation struct { + Type string `msgpack:"type"` // "insert", "update", "delete" + Database string `msgpack:"database"` + Collection string `msgpack:"collection"` + DocumentID string `msgpack:"document_id"` + Data map[string]interface{} `msgpack:"data"` + Version uint64 `msgpack:"version"` + OldData map[string]interface{} `msgpack:"old_data"` // для отката +} + +// DocumentVersion представляет версию документа для MVCC +type DocumentVersion struct { + Document *Document `msgpack:"document"` + Timestamp int64 `msgpack:"timestamp"` + TxID TransactionID `msgpack:"tx_id"` +} + +// TransactionInfo представляет информацию о транзакции для API +type TransactionInfo struct { + ID string `json:"id"` + Status string `json:"status"` + StartTime int64 `json:"start_time"` + OperationCount int `json:"operation_count"` + Operations []OperationInfo `json:"operations,omitempty"` +} + +// OperationInfo представляет информацию об операции для API +type OperationInfo struct { + Type string `json:"type"` + Database string `json:"database"` + Collection string `json:"collection"` + DocumentID string `json:"document_id"` +} + +// LoggerInterface определяет интерфейс для логирования +type LoggerInterface interface { + Debug(msg string) + Info(msg string) + Error(msg string) + Warn(msg string) +} + +// TransactionManager управляет транзакциями +type TransactionManager struct { + activeTransactions sync.Map // map[TransactionID]*Transaction + nextTxID atomic.Uint64 + wal *WriteAheadLog + logger LoggerInterface + mu sync.RWMutex + walPath string + snapshotInterval int64 // интервал создания снапшотов в секундах + lastSnapshot int64 +} + +// Transaction представляет одну транзакцию +type Transaction struct { + ID TransactionID + State atomic.Int32 + Operations []Operation + StartTime int64 + mu sync.RWMutex +} + +// WriteAheadLog реализует журнал предзаписи +type WriteAheadLog struct { + file *os.File + writeChan chan []byte + done chan struct{} + mu sync.RWMutex + syncInterval time.Duration + lastSync time.Time +} + +var ( + globalTxManager *TransactionManager + txManagerOnce sync.Once + currentTx atomic.Value // *Transaction +) + +// InitTransactionManager инициализирует глобальный менеджер транзакций +func InitTransactionManager(walPath string) error { + var err error + txManagerOnce.Do(func() { + globalTxManager = &TransactionManager{ + nextTxID: atomic.Uint64{}, + walPath: walPath, + snapshotInterval: 300, // 5 минут + lastSnapshot: time.Now().Unix(), + } + globalTxManager.nextTxID.Store(1) + err = globalTxManager.initWAL(walPath) + if err == nil { + // Восстанавливаем состояние из WAL + go globalTxManager.recoverFromWAL() + // Запускаем периодическое создание снапшотов + go globalTxManager.snapshotLoop() + } + }) + return err +} + +// SetTransactionLogger устанавливает логгер для транзакций +func SetTransactionLogger(logger LoggerInterface) { + if globalTxManager != nil { + globalTxManager.logger = logger + } +} + +// initWAL инициализирует Write-Ahead Log +func (tm *TransactionManager) initWAL(walPath string) error { + wal, err := NewWriteAheadLog(walPath) + if err != nil { + return err + } + tm.wal = wal + return nil +} + +// NewWriteAheadLog создаёт новый WAL +func NewWriteAheadLog(path string) (*WriteAheadLog, error) { + file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644) + if err != nil { + return nil, err + } + + wal := &WriteAheadLog{ + file: file, + writeChan: make(chan []byte, 10000), + done: make(chan struct{}), + syncInterval: 5 * time.Second, + lastSync: time.Now(), + } + + go wal.writerLoop() + + return wal, nil +} + +// writerLoop асинхронно записывает данные в WAL +func (wal *WriteAheadLog) writerLoop() { + batch := make([][]byte, 0, 100) + ticker := time.NewTicker(wal.syncInterval) + defer ticker.Stop() + + for { + select { + case data, ok := <-wal.writeChan: + if !ok { + if len(batch) > 0 { + wal.flushBatch(batch) + } + close(wal.done) + return + } + batch = append(batch, data) + if len(batch) >= 100 { + wal.flushBatch(batch) + batch = batch[:0] + } + + case <-ticker.C: + if len(batch) > 0 { + wal.flushBatch(batch) + batch = batch[:0] + } + wal.sync() + } + } +} + +// flushBatch записывает пакет данных в файл +func (wal *WriteAheadLog) flushBatch(batch [][]byte) { + wal.mu.Lock() + defer wal.mu.Unlock() + + for _, data := range batch { + lenBuf := make([]byte, 4) + binary.BigEndian.PutUint32(lenBuf, uint32(len(data))) + + if _, err := wal.file.Write(lenBuf); err != nil { + continue + } + if _, err := wal.file.Write(data); err != nil { + continue + } + } +} + +// sync синхронизирует файл с диском +func (wal *WriteAheadLog) sync() { + wal.mu.Lock() + defer wal.mu.Unlock() + + if time.Since(wal.lastSync) >= wal.syncInterval { + wal.file.Sync() + wal.lastSync = time.Now() + } +} + +// Write записывает запись в WAL +func (wal *WriteAheadLog) Write(record *TransactionRecord) error { + data, err := serializer.Marshal(record) + if err != nil { + return err + } + + select { + case wal.writeChan <- data: + return nil + case <-time.After(5 * time.Second): + return fmt.Errorf("WAL write timeout") + } +} + +// Close закрывает WAL +func (wal *WriteAheadLog) Close() error { + close(wal.writeChan) + <-wal.done + return wal.file.Close() +} + +// ReadAll читает все записи из WAL +func (wal *WriteAheadLog) ReadAll() ([]*TransactionRecord, error) { + wal.mu.RLock() + defer wal.mu.RUnlock() + + if _, err := wal.file.Seek(0, 0); err != nil { + return nil, err + } + + records := make([]*TransactionRecord, 0) + buf := make([]byte, 4) + + for { + _, err := wal.file.Read(buf) + if err != nil { + break + } + + recordLen := binary.BigEndian.Uint32(buf) + recordData := make([]byte, recordLen) + _, err = wal.file.Read(recordData) + if err != nil { + break + } + + var record TransactionRecord + if err := serializer.Unmarshal(recordData, &record); err != nil { + continue + } + records = append(records, &record) + } + + return records, nil +} + +// BeginTransaction начинает новую транзакцию +func BeginTransaction() *Transaction { + if globalTxManager == nil { + InitTransactionManager("futriis.wal") + } + + tx := &Transaction{ + ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1), + StartTime: time.Now().UnixMilli(), + Operations: make([]Operation, 0), + } + tx.State.Store(int32(TransactionActive)) + + globalTxManager.activeTransactions.Store(tx.ID, tx) + currentTx.Store(tx) + + // Записываем начало транзакции в WAL + record := &TransactionRecord{ + ID: tx.ID, + State: TransactionActive, + Timestamp: tx.StartTime, + Operations: tx.Operations, + } + if globalTxManager.wal != nil { + globalTxManager.wal.Write(record) + } + + if globalTxManager.logger != nil { + globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started", tx.ID)) + } + + AuditLog("START", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{ + "start_time": tx.StartTime, + }) + + return tx +} + +// BeginTransactionOnCollection начинает транзакцию для конкретной коллекции +func BeginTransactionOnCollection(coll *Collection) error { + if globalTxManager == nil { + if err := InitTransactionManager("futriis.wal"); err != nil { + return err + } + } + + tx := BeginTransaction() + if tx == nil { + return fmt.Errorf("failed to create transaction") + } + + if globalTxManager.logger != nil { + globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started on collection %s.%s", tx.ID, coll.dbName, coll.name)) + } + + return nil +} + +// AddToTransaction добавляет операцию в текущую транзакцию +func AddToTransaction(coll *Collection, opType string, docID string, newData map[string]interface{}, oldData map[string]interface{}, version uint64) error { + txVal := currentTx.Load() + if txVal == nil { + return fmt.Errorf("no active transaction") + } + + tx := txVal.(*Transaction) + if TransactionState(tx.State.Load()) != TransactionActive { + return fmt.Errorf("transaction is not active") + } + + op := Operation{ + Type: opType, + Database: coll.dbName, + Collection: coll.name, + DocumentID: docID, + Data: newData, + OldData: oldData, + Version: version, + } + + tx.mu.Lock() + tx.Operations = append(tx.Operations, op) + tx.mu.Unlock() + + if globalTxManager.logger != nil { + globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d: added %s operation on %s", tx.ID, opType, docID)) + } + + return nil +} + +// CommitCurrentTransaction коммитит текущую транзакцию +func CommitCurrentTransaction() error { + txVal := currentTx.Load() + if txVal == nil { + return fmt.Errorf("no active transaction") + } + + tx := txVal.(*Transaction) + if TransactionState(tx.State.Load()) != TransactionActive { + return fmt.Errorf("transaction is not active") + } + + // Применяем все операции атомарно + for _, op := range tx.Operations { + if err := applyOperation(op); err != nil { + // Откатываем при ошибке + AbortCurrentTransaction() + return fmt.Errorf("transaction commit failed at operation %s: %v", op.Type, err) + } + } + + tx.State.Store(int32(TransactionCommitted)) + + // Записываем коммит в WAL + record := &TransactionRecord{ + ID: tx.ID, + State: TransactionCommitted, + Timestamp: time.Now().UnixMilli(), + Operations: tx.Operations, + } + if globalTxManager.wal != nil { + globalTxManager.wal.Write(record) + } + + if globalTxManager.logger != nil { + globalTxManager.logger.Info(fmt.Sprintf("Transaction %d committed with %d operations", tx.ID, len(tx.Operations))) + } + + AuditLog("COMMIT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{ + "operations": len(tx.Operations), + }) + + // Очищаем текущую транзакцию + currentTx.Store(nil) + globalTxManager.activeTransactions.Delete(tx.ID) + + return nil +} + +// AbortCurrentTransaction откатывает текущую транзакцию +func AbortCurrentTransaction() error { + txVal := currentTx.Load() + if txVal == nil { + return fmt.Errorf("no active transaction") + } + + tx := txVal.(*Transaction) + tx.State.Store(int32(TransactionAborted)) + + // Записываем откат в WAL + record := &TransactionRecord{ + ID: tx.ID, + State: TransactionAborted, + Timestamp: time.Now().UnixMilli(), + Operations: tx.Operations, + } + if globalTxManager.wal != nil { + globalTxManager.wal.Write(record) + } + + if globalTxManager.logger != nil { + globalTxManager.logger.Info(fmt.Sprintf("Transaction %d aborted with %d operations", tx.ID, len(tx.Operations))) + } + + AuditLog("ABORT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{ + "operations": len(tx.Operations), + }) + + // Очищаем текущую транзакцию + currentTx.Store(nil) + globalTxManager.activeTransactions.Delete(tx.ID) + + return nil +} + +// HasActiveTransaction проверяет наличие активной транзакции +func HasActiveTransaction() bool { + return currentTx.Load() != nil +} + +// GetCurrentTransactionID возвращает ID текущей транзакции +func GetCurrentTransactionID() string { + txVal := currentTx.Load() + if txVal == nil { + return "" + } + tx := txVal.(*Transaction) + return fmt.Sprintf("%d", tx.ID) +} + +// GetActiveTransactions возвращает список активных транзакций для API +func GetActiveTransactions() []TransactionInfo { + if globalTxManager == nil { + return []TransactionInfo{} + } + + transactions := make([]TransactionInfo, 0) + + globalTxManager.activeTransactions.Range(func(key, value interface{}) bool { + tx := value.(*Transaction) + status := "active" + if TransactionState(tx.State.Load()) == TransactionCommitted { + status = "committed" + } else if TransactionState(tx.State.Load()) == TransactionAborted { + status = "aborted" + } + + tx.mu.RLock() + opCount := len(tx.Operations) + operations := make([]OperationInfo, 0, opCount) + for _, op := range tx.Operations { + operations = append(operations, OperationInfo{ + Type: op.Type, + Database: op.Database, + Collection: op.Collection, + DocumentID: op.DocumentID, + }) + } + tx.mu.RUnlock() + + transactions = append(transactions, TransactionInfo{ + ID: fmt.Sprintf("%d", tx.ID), + Status: status, + StartTime: tx.StartTime, + OperationCount: opCount, + Operations: operations, + }) + return true + }) + + return transactions +} + +// GetTransactionByID возвращает транзакцию по ID +func GetTransactionByID(id string) (*Transaction, error) { + if globalTxManager == nil { + return nil, fmt.Errorf("transaction manager not initialized") + } + + var txID TransactionID + fmt.Sscanf(id, "%d", &txID) + + if val, ok := globalTxManager.activeTransactions.Load(txID); ok { + return val.(*Transaction), nil + } + + return nil, fmt.Errorf("transaction not found") +} + +// FindInTransaction ищет документ в контексте транзакции +func FindInTransaction(coll *Collection, id string) (*Document, error) { + txVal := currentTx.Load() + if txVal == nil { + return coll.Find(id) + } + + tx := txVal.(*Transaction) + + tx.mu.RLock() + defer tx.mu.RUnlock() + + // Ищем в операциях транзакции в обратном порядке + for i := len(tx.Operations) - 1; i >= 0; i-- { + op := tx.Operations[i] + if op.DocumentID == id { + if op.Type == "delete" { + return nil, fmt.Errorf("document deleted in transaction") + } + if op.Type == "insert" || op.Type == "update" { + doc := NewDocumentWithID(op.DocumentID) + for k, v := range op.Data { + doc.SetField(k, v) + } + doc.Version = op.Version + return doc, nil + } + } + } + + return coll.Find(id) +} + +// applyOperation применяет операцию к хранилищу +func applyOperation(op Operation) error { + db, err := GetGlobalStorage().GetDatabase(op.Database) + if err != nil { + return fmt.Errorf("database not found: %s", op.Database) + } + + coll, err := db.GetCollection(op.Collection) + if err != nil { + return fmt.Errorf("collection not found: %s", op.Collection) + } + + switch op.Type { + case "insert": + doc := NewDocumentWithID(op.DocumentID) + for k, v := range op.Data { + doc.SetField(k, v) + } + doc.Version = op.Version + return coll.Insert(doc) + + case "update": + return coll.Update(op.DocumentID, op.Data) + + case "delete": + return coll.Delete(op.DocumentID) + } + + return nil +} + +// recoverFromWAL восстанавливает состояние из WAL после сбоя +func (tm *TransactionManager) recoverFromWAL() { + if tm.wal == nil { + return + } + + records, err := tm.wal.ReadAll() + if err != nil { + if tm.logger != nil { + tm.logger.Error(fmt.Sprintf("Failed to read WAL: %v", err)) + } + return + } + + for _, record := range records { + if record.State == TransactionCommitted { + // Повторно применяем закоммиченные операции + for _, op := range record.Operations { + if err := applyOperation(op); err != nil { + if tm.logger != nil { + tm.logger.Error(fmt.Sprintf("Failed to replay operation: %v", err)) + } + } + } + } + // Транзакции в состоянии Active или Aborted игнорируем + } + + if tm.logger != nil { + tm.logger.Info(fmt.Sprintf("Recovered %d transactions from WAL", len(records))) + } +} + +// snapshotLoop периодически создаёт снапшоты +func (tm *TransactionManager) snapshotLoop() { + ticker := time.NewTicker(time.Duration(tm.snapshotInterval) * time.Second) + defer ticker.Stop() + + for range ticker.C { + tm.createSnapshot() + } +} + +// createSnapshot создаёт снапшот текущего состояния +func (tm *TransactionManager) createSnapshot() { + now := time.Now().Unix() + if now-tm.lastSnapshot < tm.snapshotInterval { + return + } + + snapshotPath := fmt.Sprintf("%s.snapshot.%d", tm.walPath, now) + if err := GetGlobalStorage().Backup(snapshotPath); err != nil { + if tm.logger != nil { + tm.logger.Error(fmt.Sprintf("Failed to create snapshot: %v", err)) + } + return + } + + tm.lastSnapshot = now + + if tm.logger != nil { + tm.logger.Info(fmt.Sprintf("Snapshot created: %s", snapshotPath)) + } + + // Удаляем старые снапшоты + go tm.cleanOldSnapshots() +} + +// cleanOldSnapshots удаляет старые снапшоты +func (tm *TransactionManager) cleanOldSnapshots() { + // Реализация удаления старых снапшотов +} + +// GetGlobalStorage возвращает глобальное хранилище (должно быть установлено при инициализации) +var globalStorage *Storage + +// SetGlobalStorage устанавливает глобальное хранилище +func SetGlobalStorage(s *Storage) { + globalStorage = s +} + +// GetGlobalStorage возвращает глобальное хранилище +func GetGlobalStorage() *Storage { + return globalStorage +} + +// AuditLog записывает событие в аудит +func AuditLog(operation, dataType, name string, details map[string]interface{}) { + LogAudit(operation, dataType, name, details) +} + +// MVCCSnapshot создаёт снапшот текущего состояния для MVCC +func MVCCSnapshot() uint64 { + return uint64(time.Now().UnixNano()) +} + +// CreateDocumentVersion создаёт новую версию документа для MVCC +func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion { + return &DocumentVersion{ + Document: doc.Clone(), + Timestamp: time.Now().UnixMilli(), + TxID: txID, + } +}