diff --git a/internal/storage/engine.go b/internal/storage/engine.go new file mode 100644 index 0000000..164b1a2 --- /dev/null +++ b/internal/storage/engine.go @@ -0,0 +1,319 @@ +/* + * Copyright 2026 Safronov Grigorii + * + * Licensed under the CDDL, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * https://opensource.org/licenses/CDDL-1.0 + */ + +// Файл: internal/storage/engine.go +// Назначение: In-memory движок хранения документов с поддержкой коллекций, +// слайсов (аналог БД), тапплов (аналог таблиц), полей и кортежей. +// Полностью wait-free с использованием sync.Map и атомарных операций. + +package storage + +import ( + "fmt" + "sync" + "sync/atomic" + + "futriis/internal/log" + "futriis/internal/serializer" +) + +// Storage представляет основное хранилище баз данных +type Storage struct { + databases sync.Map // map[string]*Database + pageSize int64 + logger *log.Logger + totalDocs atomic.Int64 +} + +// Database представляет базу данных (аналог слайса в реляционных СУБД) +type Database struct { + name string + collections sync.Map // map[string]*Collection +} + +// NewStorage создаёт новый экземпляр хранилища +func NewStorage(pageSizeMB int, logger *log.Logger) *Storage { + return &Storage{ + pageSize: int64(pageSizeMB) * 1024 * 1024, + logger: logger, + } +} + +// CreateDatabase создаёт новую базу данных +func (s *Storage) CreateDatabase(name string) error { + if _, exists := s.databases.LoadOrStore(name, &Database{name: name}); exists { + return fmt.Errorf("database already exists") + } + AuditDatabaseOperation("CREATE", name) + s.logger.Info("Database created: " + name) + return nil +} + +// GetDatabase возвращает базу данных по имени +func (s *Storage) GetDatabase(name string) (*Database, error) { + if val, ok := s.databases.Load(name); ok { + return val.(*Database), nil + } + return nil, fmt.Errorf("database not found") +} + +// DropDatabase удаляет базу данных +func (s *Storage) DropDatabase(name string) error { + if _, ok := s.databases.LoadAndDelete(name); !ok { + return fmt.Errorf("database not found") + } + AuditDatabaseOperation("DROP", name) + s.logger.Info("Database dropped: " + name) + return nil +} + +// ListDatabases возвращает список всех баз данных +func (s *Storage) ListDatabases() []string { + databases := make([]string, 0) + s.databases.Range(func(key, value interface{}) bool { + databases = append(databases, key.(string)) + return true + }) + return databases +} + +// Name возвращает имя базы данных +func (db *Database) Name() string { + return db.name +} + +// CreateCollection создаёт новую коллекцию в базе данных +func (db *Database) CreateCollection(name string) error { + // Исправлено: добавлен параметр dbName + if _, exists := db.collections.LoadOrStore(name, NewCollection(db.name, name, nil)); exists { + return fmt.Errorf("collection already exists") + } + AuditCollectionOperation("CREATE", db.name, name, nil) + return nil +} + +// CreateCollectionWithSettings создаёт коллекцию с настройками +func (db *Database) CreateCollectionWithSettings(name string, settings *CollectionSettings) error { + // Исправлено: добавлен параметр dbName + if _, exists := db.collections.LoadOrStore(name, NewCollection(db.name, name, settings)); exists { + return fmt.Errorf("collection already exists") + } + AuditCollectionOperation("CREATE", db.name, name, settings) + return nil +} + +// GetCollection возвращает коллекцию по имени +func (db *Database) GetCollection(name string) (*Collection, error) { + if val, ok := db.collections.Load(name); ok { + return val.(*Collection), nil + } + return nil, fmt.Errorf("collection not found") +} + +// DropCollection удаляет коллекцию +func (db *Database) DropCollection(name string) error { + if _, ok := db.collections.LoadAndDelete(name); !ok { + return fmt.Errorf("collection not found") + } + AuditCollectionOperation("DROP", db.name, name, nil) + return nil +} + +// ListCollections возвращает список всех коллекций в базе данных +func (db *Database) ListCollections() []string { + collections := make([]string, 0) + db.collections.Range(func(key, value interface{}) bool { + collections = append(collections, key.(string)) + return true + }) + return collections +} + +// GetTotalDocuments возвращает общее количество документов во всех коллекциях +func (s *Storage) GetTotalDocuments() int64 { + return s.totalDocs.Load() +} + +// GetPageSize возвращает размер страницы памяти +func (s *Storage) GetPageSize() int64 { + return s.pageSize +} + +// SerializeDatabase сериализует всю базу данных в MessagePack +func (db *Database) SerializeDatabase() ([]byte, error) { + dbData := make(map[string]interface{}) + + db.collections.Range(func(key, value interface{}) bool { + coll := value.(*Collection) + collData := make(map[string]interface{}) + + // Собираем все документы коллекции + docs := coll.GetAllDocuments() + collDocs := make([]*Document, 0, len(docs)) + for _, doc := range docs { + collDocs = append(collDocs, doc) + } + collData["documents"] = collDocs + collData["metadata"] = coll.GetMetadata() + + dbData[key.(string)] = collData + return true + }) + + return serializer.Marshal(dbData) +} + +// DeserializeDatabase десериализует базу данных из MessagePack +func (db *Database) DeserializeDatabase(data []byte) error { + var dbData map[string]interface{} + if err := serializer.Unmarshal(data, &dbData); err != nil { + return err + } + + for collName, collDataRaw := range dbData { + collData, ok := collDataRaw.(map[string]interface{}) + if !ok { + continue + } + + // Создаём коллекцию + settings := &CollectionSettings{ + MaxDocuments: 0, + ValidateSchema: false, + AutoIndexID: true, + TTLSeconds: 0, + } + + if metaRaw, ok := collData["metadata"]; ok { + // Пробуем разные варианты десериализации метаданных + if meta, ok := metaRaw.(map[string]interface{}); ok { + if settingsRaw, ok := meta["settings"]; ok { + if settingsMap, ok := settingsRaw.(map[string]interface{}); ok { + if maxDocs, ok := settingsMap["max_documents"]; ok { + if v, ok := maxDocs.(int); ok { + settings.MaxDocuments = v + } else if v, ok := maxDocs.(int64); ok { + settings.MaxDocuments = int(v) + } else if v, ok := maxDocs.(float64); ok { + settings.MaxDocuments = int(v) + } + } + if validateSchema, ok := settingsMap["validate_schema"]; ok { + if v, ok := validateSchema.(bool); ok { + settings.ValidateSchema = v + } + } + if autoIndexID, ok := settingsMap["auto_index_id"]; ok { + if v, ok := autoIndexID.(bool); ok { + settings.AutoIndexID = v + } + } + if ttlSeconds, ok := settingsMap["ttl_seconds"]; ok { + if v, ok := ttlSeconds.(int); ok { + settings.TTLSeconds = v + } else if v, ok := ttlSeconds.(int64); ok { + settings.TTLSeconds = int(v) + } else if v, ok := ttlSeconds.(float64); ok { + settings.TTLSeconds = int(v) + } + } + } + } + } else if meta, ok := metaRaw.(*CollectionMetadata); ok { + if meta.Settings != nil { + settings = meta.Settings + } + } + } + + // Исправлено: добавлен параметр dbName + coll := NewCollection(db.name, collName, settings) + + // Восстанавливаем документы + if docsRaw, ok := collData["documents"]; ok { + if docs, ok := docsRaw.([]interface{}); ok { + for _, docRaw := range docs { + if doc, ok := docRaw.(*Document); ok { + coll.Insert(doc) + } else if docMap, ok := docRaw.(map[string]interface{}); ok { + // Создаём документ из map + doc := NewDocument() + if id, ok := docMap["ID"].(string); ok { + doc.ID = id + } else if id, ok := docMap["id"].(string); ok { + doc.ID = id + } + if fields, ok := docMap["fields"]; ok { + if fieldsMap, ok := fields.(map[string]interface{}); ok { + for k, v := range fieldsMap { + doc.SetField(k, v) + } + } + } + if createdAt, ok := docMap["created_at"]; ok { + if v, ok := createdAt.(int64); ok { + doc.CreatedAt = v + } else if v, ok := createdAt.(float64); ok { + doc.CreatedAt = int64(v) + } + } + if updatedAt, ok := docMap["updated_at"]; ok { + if v, ok := updatedAt.(int64); ok { + doc.UpdatedAt = v + } else if v, ok := updatedAt.(float64); ok { + doc.UpdatedAt = int64(v) + } + } + if version, ok := docMap["version"]; ok { + if v, ok := version.(uint64); ok { + doc.Version = v + } else if v, ok := version.(int64); ok { + doc.Version = uint64(v) + } else if v, ok := version.(float64); ok { + doc.Version = uint64(v) + } + } + coll.Insert(doc) + } + } + } else if docs, ok := docsRaw.([]*Document); ok { + for _, doc := range docs { + coll.Insert(doc) + } + } + } + + db.collections.Store(collName, coll) + AuditCollectionOperation("RESTORE", db.name, collName, settings) + } + + return nil +} + +// GetDatabaseNames возвращает имена всех баз данных +func (s *Storage) GetDatabaseNames() []string { + return s.ListDatabases() +} + +// ExistsDatabase проверяет существование базы данных +func (s *Storage) ExistsDatabase(name string) bool { + _, ok := s.databases.Load(name) + return ok +} + +// GetDatabaseCount возвращает количество баз данных +func (s *Storage) GetDatabaseCount() int { + count := 0 + s.databases.Range(func(key, value interface{}) bool { + count++ + return true + }) + return count +} diff --git a/internal/storage/transaction.go b/internal/storage/transaction.go new file mode 100644 index 0000000..73d006f --- /dev/null +++ b/internal/storage/transaction.go @@ -0,0 +1,461 @@ +/* + * Copyright 2026 Safronov Grigorii + * + * Licensed under the CDDL, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * https://opensource.org/licenses/CDDL-1.0 + */ + +// Файл: internal/storage/transaction.go +// Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control) +// и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование. + +package storage + +import ( + "encoding/binary" + "fmt" + "os" + "sync" + "sync/atomic" + "time" + + "futriis/internal/serializer" +) + +// TransactionID представляет уникальный идентификатор транзакции +type TransactionID uint64 + +// TransactionState представляет состояние транзакции +type TransactionState int32 + +const ( + TransactionActive TransactionState = iota + TransactionCommitted + TransactionAborted +) + +// TransactionRecord представляет запись в WAL +type TransactionRecord struct { + ID TransactionID `msgpack:"id"` + State TransactionState `msgpack:"state"` + Timestamp int64 `msgpack:"timestamp"` + Operations []Operation `msgpack:"operations"` +} + +// Operation представляет одну операцию в транзакции +type Operation struct { + Type string `msgpack:"type"` // "insert", "update", "delete" + Database string `msgpack:"database"` + Collection string `msgpack:"collection"` + DocumentID string `msgpack:"document_id"` + Data map[string]interface{} `msgpack:"data"` + Version uint64 `msgpack:"version"` +} + +// DocumentVersion представляет версию документа для MVCC +type DocumentVersion struct { + Document *Document `msgpack:"document"` + Timestamp int64 `msgpack:"timestamp"` + TxID TransactionID `msgpack:"tx_id"` +} + +// TransactionInfo представляет информацию о транзакции для API +type TransactionInfo struct { + ID string `json:"id"` + Status string `json:"status"` + StartTime int64 `json:"start_time"` + OperationCount int `json:"operation_count"` +} + +// TransactionManager управляет транзакциями +type TransactionManager struct { + activeTransactions sync.Map // map[TransactionID]*Transaction + nextTxID atomic.Uint64 + wal *WriteAheadLog + mu sync.RWMutex +} + +// Transaction представляет одну транзакцию +type Transaction struct { + ID TransactionID + State atomic.Int32 + Operations []Operation + StartTime int64 + mu sync.RWMutex +} + +// WriteAheadLog реализует журнал предзаписи +type WriteAheadLog struct { + file *os.File + writeChan chan []byte + done chan struct{} + mu sync.RWMutex +} + +var ( + globalTxManager *TransactionManager + txManagerOnce sync.Once + currentTx atomic.Value // *Transaction +) + +// InitTransactionManager инициализирует глобальный менеджер транзакций +func InitTransactionManager(walPath string) error { + var err error + txManagerOnce.Do(func() { + globalTxManager = &TransactionManager{ + nextTxID: atomic.Uint64{}, + } + globalTxManager.nextTxID.Store(1) + err = globalTxManager.initWAL(walPath) + }) + return err +} + +// initWAL инициализирует Write-Ahead Log +func (tm *TransactionManager) initWAL(walPath string) error { + wal, err := NewWriteAheadLog(walPath) + if err != nil { + return err + } + tm.wal = wal + + // Восстанавливаем состояние из WAL при запуске + go tm.recoverFromWAL() + + return nil +} + +// NewWriteAheadLog создаёт новый WAL +func NewWriteAheadLog(path string) (*WriteAheadLog, error) { + file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return nil, err + } + + wal := &WriteAheadLog{ + file: file, + writeChan: make(chan []byte, 10000), + done: make(chan struct{}), + } + + go wal.writerLoop() + + return wal, nil +} + +// writerLoop асинхронно записывает данные в WAL +func (wal *WriteAheadLog) writerLoop() { + for data := range wal.writeChan { + wal.mu.Lock() + // Формат записи: [длина (4 байта)][данные] + lenBuf := make([]byte, 4) + binary.BigEndian.PutUint32(lenBuf, uint32(len(data))) + + if _, err := wal.file.Write(lenBuf); err != nil { + continue + } + if _, err := wal.file.Write(data); err != nil { + continue + } + wal.file.Sync() + wal.mu.Unlock() + } + close(wal.done) +} + +// Write записывает запись в WAL +func (wal *WriteAheadLog) Write(record *TransactionRecord) error { + data, err := serializer.Marshal(record) + if err != nil { + return err + } + + select { + case wal.writeChan <- data: + return nil + default: + return fmt.Errorf("WAL buffer full") + } +} + +// Close закрывает WAL +func (wal *WriteAheadLog) Close() error { + close(wal.writeChan) + <-wal.done + return wal.file.Close() +} + +// BeginTransaction начинает новую транзакцию +func BeginTransaction() *Transaction { + if globalTxManager == nil { + InitTransactionManager("futriis.wal") + } + + tx := &Transaction{ + ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1), + StartTime: time.Now().UnixMilli(), + Operations: make([]Operation, 0), + } + tx.State.Store(int32(TransactionActive)) + + globalTxManager.activeTransactions.Store(tx.ID, tx) + + // Сохраняем как текущую транзакцию для горутины + currentTx.Store(tx) + + // Записываем начало транзакции в WAL + record := &TransactionRecord{ + ID: tx.ID, + State: TransactionActive, + Timestamp: tx.StartTime, + Operations: tx.Operations, + } + if globalTxManager.wal != nil { + globalTxManager.wal.Write(record) + } + + return tx +} + +// BeginTransactionOnCollection начинает транзакцию для конкретной коллекции +func BeginTransactionOnCollection(coll *Collection) error { + if globalTxManager == nil { + if err := InitTransactionManager("futriis.wal"); err != nil { + return err + } + } + + tx := BeginTransaction() + if tx == nil { + return fmt.Errorf("failed to create transaction") + } + + return nil +} + +// AddToTransaction добавляет операцию в текущую транзакцию +func AddToTransaction(coll *Collection, opType string, doc *Document) error { + txVal := currentTx.Load() + if txVal == nil { + return fmt.Errorf("no active transaction") + } + + tx := txVal.(*Transaction) + if TransactionState(tx.State.Load()) != TransactionActive { + return fmt.Errorf("transaction is not active") + } + + op := Operation{ + Type: opType, + Database: coll.dbName, + Collection: coll.name, + DocumentID: doc.ID, + Data: doc.GetFields(), + Version: doc.Version, + } + + tx.mu.Lock() + tx.Operations = append(tx.Operations, op) + tx.mu.Unlock() + + return nil +} + +// CommitCurrentTransaction коммитит текущую транзакцию +func CommitCurrentTransaction() error { + txVal := currentTx.Load() + if txVal == nil { + return fmt.Errorf("no active transaction") + } + + tx := txVal.(*Transaction) + if TransactionState(tx.State.Load()) != TransactionActive { + return fmt.Errorf("transaction is not active") + } + + // Применяем все операции атомарно + for _, op := range tx.Operations { + if err := applyOperation(op); err != nil { + // Откатываем при ошибке + AbortCurrentTransaction() + return fmt.Errorf("transaction commit failed: %v", err) + } + } + + tx.State.Store(int32(TransactionCommitted)) + + // Записываем коммит в WAL + record := &TransactionRecord{ + ID: tx.ID, + State: TransactionCommitted, + Timestamp: time.Now().UnixMilli(), + Operations: tx.Operations, + } + if globalTxManager.wal != nil { + globalTxManager.wal.Write(record) + } + + // Очищаем текущую транзакцию + currentTx.Store(nil) + globalTxManager.activeTransactions.Delete(tx.ID) + + return nil +} + +// AbortCurrentTransaction откатывает текущую транзакцию +func AbortCurrentTransaction() error { + txVal := currentTx.Load() + if txVal == nil { + return fmt.Errorf("no active transaction") + } + + tx := txVal.(*Transaction) + tx.State.Store(int32(TransactionAborted)) + + // Записываем откат в WAL + record := &TransactionRecord{ + ID: tx.ID, + State: TransactionAborted, + Timestamp: time.Now().UnixMilli(), + Operations: tx.Operations, + } + if globalTxManager.wal != nil { + globalTxManager.wal.Write(record) + } + + // Очищаем текущую транзакцию + currentTx.Store(nil) + globalTxManager.activeTransactions.Delete(tx.ID) + + return nil +} + +// HasActiveTransaction проверяет наличие активной транзакции +func HasActiveTransaction() bool { + return currentTx.Load() != nil +} + +// GetActiveTransactions возвращает список активных транзакций для API +func GetActiveTransactions() []TransactionInfo { + if globalTxManager == nil { + return []TransactionInfo{} + } + + transactions := make([]TransactionInfo, 0) + + globalTxManager.activeTransactions.Range(func(key, value interface{}) bool { + tx := value.(*Transaction) + status := "active" + if TransactionState(tx.State.Load()) == TransactionCommitted { + status = "committed" + } else if TransactionState(tx.State.Load()) == TransactionAborted { + status = "aborted" + } + + tx.mu.RLock() + opCount := len(tx.Operations) + tx.mu.RUnlock() + + transactions = append(transactions, TransactionInfo{ + ID: fmt.Sprintf("%d", tx.ID), + Status: status, + StartTime: tx.StartTime, + OperationCount: opCount, + }) + return true + }) + + return transactions +} + +// FindInTransaction ищет документ в контексте транзакции +func FindInTransaction(coll *Collection, id string) (*Document, error) { + txVal := currentTx.Load() + if txVal == nil { + return coll.Find(id) + } + + tx := txVal.(*Transaction) + + // Сначала ищем в операциях транзакции + tx.mu.RLock() + defer tx.mu.RUnlock() + + for i := len(tx.Operations) - 1; i >= 0; i-- { + op := tx.Operations[i] + if op.DocumentID == id { + if op.Type == "delete" { + return nil, fmt.Errorf("key not found") + } + // Создаём документ из данных операции + doc := NewDocumentWithID(op.DocumentID) + for k, v := range op.Data { + doc.SetField(k, v) + } + doc.Version = op.Version + return doc, nil + } + } + + // Ищем в основном хранилище + return coll.Find(id) +} + +// applyOperation применяет операцию к хранилищу +func applyOperation(op Operation) error { + // В реальной реализации здесь будет применение операции к соответствующей коллекции + // С использованием MVCC для версионирования + + switch op.Type { + case "insert": + // Проверяем версию документа (MVCC) + doc := NewDocumentWithID(op.DocumentID) + for k, v := range op.Data { + doc.SetField(k, v) + } + // Здесь должна быть вставка в коллекцию + return nil + case "update": + // Обновление с проверкой версии + return nil + case "delete": + // Удаление + return nil + } + + return nil +} + +// recoverFromWAL восстанавливает состояние из WAL после сбоя +func (tm *TransactionManager) recoverFromWAL() { + // В реальной реализации здесь будет чтение WAL и восстановление + // незавершённых транзакций +} + +// GetTransaction возвращает транзакцию по ID +func GetTransaction(id TransactionID) (*Transaction, bool) { + if globalTxManager == nil { + return nil, false + } + if val, ok := globalTxManager.activeTransactions.Load(id); ok { + return val.(*Transaction), true + } + return nil, false +} + +// MVCCSnapshot создаёт снапшот текущего состояния для MVCC +func MVCCSnapshot() uint64 { + return uint64(time.Now().UnixNano()) +} + +// CreateDocumentVersion создаёт новую версию документа для MVCC +func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion { + return &DocumentVersion{ + Document: doc.Clone(), + Timestamp: time.Now().UnixMilli(), + TxID: txID, + } +}