From fbe3c3ecfe80bfce5006fd7401f45ffa60ba5851 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Sun, 24 May 2026 21:49:35 +0000 Subject: [PATCH] Upload files to "internal/storage" --- internal/storage/audit.go | 184 ++++++ internal/storage/collection.go | 1136 ++++++++++++++++++++++++++++++++ internal/storage/document.go | 584 ++++++++++++++++ internal/storage/engine.go | 574 ++++++++++++++++ 4 files changed, 2478 insertions(+) create mode 100644 internal/storage/audit.go create mode 100644 internal/storage/collection.go create mode 100644 internal/storage/document.go create mode 100644 internal/storage/engine.go diff --git a/internal/storage/audit.go b/internal/storage/audit.go new file mode 100644 index 0000000..63cab2f --- /dev/null +++ b/internal/storage/audit.go @@ -0,0 +1,184 @@ +/* + * Copyright 2026 Safronov Grigorii + * + * Licensed under the CDDL, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * https://opensource.org/licenses/CDDL-1.0 + */ + +// Файл: internal/storage/audit.go +// Назначение: Аудит всех операций создания, изменения, удаления данных +// с записью временной метки с точностью до миллисекунды + +package storage + +import ( + "fmt" + "sync" + "time" +) + +// AuditEntry представляет запись аудита +type AuditEntry struct { + ID string `msgpack:"id"` + Timestamp int64 `msgpack:"timestamp"` // Unix миллисекунды + TimestampStr string `msgpack:"timestamp_str"` // Человекочитаемая строка + Operation string `msgpack:"operation"` // CREATE, UPDATE, DELETE, START, COMMIT, ABORT, CLUSTER, SOFT_DELETE, RESTORE, PERMANENT_DELETE + DataType string `msgpack:"data_type"` // DATABASE, COLLECTION, DOCUMENT, FIELD, TUPLE, SESSION, TRANSACTION, CLUSTER, INDEX + Name string `msgpack:"name"` // Имя объекта + Details map[string]interface{} `msgpack:"details"` // Детали операции +} + +// AuditLogger управляет аудитом +type AuditLogger struct { + entries []AuditEntry + mu sync.RWMutex +} + +var globalAuditLogger = &AuditLogger{ + entries: make([]AuditEntry, 0), +} + +// GetCurrentTimestamp возвращает текущую временную метку с миллисекундами +func GetCurrentTimestamp() (int64, string) { + now := time.Now() + timestampMs := now.UnixMilli() + timestampStr := now.Format("2006-01-02 15:04:05.000") + return timestampMs, timestampStr +} + +// LogAudit записывает событие в аудит +func LogAudit(operation, dataType, name string, details map[string]interface{}) { + timestampMs, timestampStr := GetCurrentTimestamp() + + // Если details не содержат timestamp, добавляем его + if details == nil { + details = make(map[string]interface{}) + } + if _, ok := details["audit_timestamp"]; !ok { + details["audit_timestamp"] = timestampMs + details["audit_timestamp_str"] = timestampStr + } + + entry := AuditEntry{ + ID: fmt.Sprintf("%d", timestampMs), + Timestamp: timestampMs, + TimestampStr: timestampStr, + Operation: operation, + DataType: dataType, + Name: name, + Details: details, + } + + globalAuditLogger.mu.Lock() + globalAuditLogger.entries = append(globalAuditLogger.entries, entry) + globalAuditLogger.mu.Unlock() +} + +// GetAuditLog возвращает копию лога аудита +func GetAuditLog() []AuditEntry { + globalAuditLogger.mu.RLock() + defer globalAuditLogger.mu.RUnlock() + + result := make([]AuditEntry, len(globalAuditLogger.entries)) + copy(result, globalAuditLogger.entries) + return result +} + +// GetAuditLogFiltered возвращает отфильтрованный лог аудита +func GetAuditLogFiltered(dataType, operation string, fromTime, toTime int64) []AuditEntry { + globalAuditLogger.mu.RLock() + defer globalAuditLogger.mu.RUnlock() + + result := make([]AuditEntry, 0) + for _, entry := range globalAuditLogger.entries { + if dataType != "" && entry.DataType != dataType { + continue + } + if operation != "" && entry.Operation != operation { + continue + } + if fromTime > 0 && entry.Timestamp < fromTime { + continue + } + if toTime > 0 && entry.Timestamp > toTime { + continue + } + result = append(result, entry) + } + return result +} + +// ClearAuditLog очищает лог аудита (только для отладки) +func ClearAuditLog() { + globalAuditLogger.mu.Lock() + defer globalAuditLogger.mu.Unlock() + globalAuditLogger.entries = make([]AuditEntry, 0) +} + +// GetAuditLogSize возвращает количество записей в логе аудита +func GetAuditLogSize() int { + globalAuditLogger.mu.RLock() + defer globalAuditLogger.mu.RUnlock() + return len(globalAuditLogger.entries) +} + +// AuditDatabaseOperation логирует операцию с базой данных +func AuditDatabaseOperation(operation, dbName string) { + LogAudit(operation, "DATABASE", dbName, map[string]interface{}{ + "database": dbName, + }) +} + +// AuditCollectionOperation логирует операцию с коллекцией +func AuditCollectionOperation(operation, dbName, collName string, settings interface{}) { + LogAudit(operation, "COLLECTION", fmt.Sprintf("%s.%s", dbName, collName), map[string]interface{}{ + "database": dbName, + "collection": collName, + "settings": settings, + }) +} + +// AuditDocumentOperation логирует операцию с документом +func AuditDocumentOperation(operation, dbName, collName, docID string, fields map[string]interface{}) { + LogAudit(operation, "DOCUMENT", fmt.Sprintf("%s.%s.%s", dbName, collName, docID), map[string]interface{}{ + "database": dbName, + "collection": collName, + "document_id": docID, + "fields": fields, + }) +} + +// AuditFieldOperation логирует операцию с полем +func AuditFieldOperation(operation, dbName, collName, docID, fieldName string, value interface{}) { + LogAudit(operation, "FIELD", fmt.Sprintf("%s.%s.%s.%s", dbName, collName, docID, fieldName), map[string]interface{}{ + "database": dbName, + "collection": collName, + "document_id": docID, + "field": fieldName, + "value": value, + }) +} + +// AuditTupleOperation логирует операцию с кортежем +func AuditTupleOperation(operation, dbName, collName, docID, tuplePath string) { + LogAudit(operation, "TUPLE", fmt.Sprintf("%s.%s.%s.%s", dbName, collName, docID, tuplePath), map[string]interface{}{ + "database": dbName, + "collection": collName, + "document_id": docID, + "tuple_path": tuplePath, + }) +} + +// AuditIndexOperation логирует операцию с индексом +func AuditIndexOperation(operation, dbName, collName, indexName string, fields []string, unique bool) { + LogAudit(operation, "INDEX", fmt.Sprintf("%s.%s.%s", dbName, collName, indexName), map[string]interface{}{ + "database": dbName, + "collection": collName, + "index_name": indexName, + "fields": fields, + "unique": unique, + }) +} diff --git a/internal/storage/collection.go b/internal/storage/collection.go new file mode 100644 index 0000000..431c261 --- /dev/null +++ b/internal/storage/collection.go @@ -0,0 +1,1136 @@ +/* + * Copyright 2026 Safronov Grigorii + * + * Licensed under the CDDL, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * https://opensource.org/licenses/CDDL-1.0 + */ + +// Файл: internal/storage/collection.go +// Назначение: Реализация коллекции с индексами (первичными и вторичными). +// Индексы хранятся отдельно от документов, обеспечивают wait-free доступ. +// Исправлено: корректная работа уникальных индексов, удаление из индексов при обновлении. + +package storage + +import ( + "fmt" + "sync" + "sync/atomic" + "time" + "strings" + + "futriis/internal/serializer" +) + +// Collection представляет коллекцию документов (аналог таблицы) +type Collection struct { + dbName string // имя базы данных (добавлено) + name string // имя коллекции + docs sync.Map // map[string]*Document - wait-free хранилище документов + indexes sync.Map // map[string]*Index - индексы для быстрого поиска + metadata *CollectionMetadata // Метаданные коллекции + docCount atomic.Int64 // Атомарный счётчик документов + sizeBytes atomic.Int64 // Атомарный размер коллекции в байтах + mu sync.RWMutex // Для операций, изменяющих структуру коллекции + constraints *Constraints // Ограничения коллекции + acl *CollectionACL // ACL для коллекции +} + +// CollectionMetadata содержит метаданные коллекции +type CollectionMetadata struct { + Name string `msgpack:"name"` + CreatedAt int64 `msgpack:"created_at"` + UpdatedAt int64 `msgpack:"updated_at"` + DeletedAt int64 `msgpack:"deleted_at"` + DocumentCount int64 `msgpack:"document_count"` + SizeBytes int64 `msgpack:"size_bytes"` + IndexCount int `msgpack:"index_count"` + Settings *CollectionSettings `msgpack:"settings"` +} + +// CollectionSettings содержит настройки коллекции +type CollectionSettings struct { + MaxDocuments int `msgpack:"max_documents"` // Максимальное количество документов (0 = безлимит) + ValidateSchema bool `msgpack:"validate_schema"` // Валидировать схему документов + AutoIndexID bool `msgpack:"auto_index_id"` // Автоматически индексировать поле _id + TTLSeconds int `msgpack:"ttl_seconds"` // Время жизни документов (0 = бессрочно) + SoftDelete bool `msgpack:"soft_delete"` // Мягкое удаление документов +} + +// Index представляет индекс для ускорения поиска (хранится отдельно от документов) +type Index struct { + Name string `msgpack:"name"` + Fields []string `msgpack:"fields"` // Поля для индексации + Unique bool `msgpack:"unique"` // Уникальный индекс + CreatedAt int64 `msgpack:"created_at"` // Время создания индекса + data sync.Map // map[interface{}]string - значение индекса -> ID документа +} + +// Constraints представляет ограничения на коллекцию +type Constraints struct { + mu sync.RWMutex + RequiredFields map[string]bool // Обязательные поля + UniqueFields map[string]bool // Уникальные поля (дополнительно к индексам) + MinValues map[string]float64 // Минимальные значения для числовых полей + MaxValues map[string]float64 // Максимальные значения для числовых полей + PatternFields map[string]string // Regexp паттерны для полей + EnumFields map[string][]interface{} // Допустимые значения для полей +} + +// CollectionACL представляет список контроля доступа для коллекции +type CollectionACL struct { + mu sync.RWMutex + ReadRoles map[string]bool // Роли, имеющие доступ на чтение + WriteRoles map[string]bool // Роли, имеющие доступ на запись + DeleteRoles map[string]bool // Роли, имеющие доступ на удаление + AdminRoles map[string]bool // Роли, имеющие полный доступ + UpdatedAt int64 `msgpack:"updated_at"` // Время последнего обновления ACL +} + +// NewCollection создаёт новую коллекцию +func NewCollection(dbName, name string, settings *CollectionSettings) *Collection { + if settings == nil { + settings = &CollectionSettings{ + MaxDocuments: 0, + ValidateSchema: false, + AutoIndexID: true, + TTLSeconds: 0, + SoftDelete: false, + } + } + + now := time.Now().UnixMilli() + coll := &Collection{ + dbName: dbName, + name: name, + metadata: &CollectionMetadata{ + Name: name, + CreatedAt: now, + UpdatedAt: now, + DeletedAt: 0, + DocumentCount: 0, + SizeBytes: 0, + IndexCount: 0, + Settings: settings, + }, + constraints: &Constraints{ + RequiredFields: make(map[string]bool), + UniqueFields: make(map[string]bool), + MinValues: make(map[string]float64), + MaxValues: make(map[string]float64), + PatternFields: make(map[string]string), + EnumFields: make(map[string][]interface{}), + }, + acl: &CollectionACL{ + ReadRoles: make(map[string]bool), + WriteRoles: make(map[string]bool), + DeleteRoles: make(map[string]bool), + AdminRoles: make(map[string]bool), + UpdatedAt: now, + }, + } + + // Автоматически создаём первичный индекс по _id + if settings.AutoIndexID { + coll.CreateIndex("_id_", []string{"_id"}, true) + } + + // Запускаем фоновую задачу для удаления просроченных документов + if settings.TTLSeconds > 0 { + go coll.ttlCleanupLoop() + } + + // Аудит создания коллекции + AuditCollectionOperation("CREATE", dbName, name, settings) + + return coll +} + +// Name возвращает имя коллекции +func (c *Collection) Name() string { + return c.name +} + +// DBName возвращает имя базы данных (добавлено) +func (c *Collection) DBName() string { + return c.dbName +} + +// Insert вставляет документ в коллекцию (wait-free) +func (c *Collection) Insert(doc *Document) error { + // Проверка ограничений + if err := c.constraints.ValidateDocument(doc); err != nil { + return fmt.Errorf("constraint violation: %v", err) + } + + // Проверка ACL (будет вызвано из верхнего уровня с ролью) + + // Проверка на максимальное количество документов + if c.metadata.Settings.MaxDocuments > 0 { + if c.docCount.Load() >= int64(c.metadata.Settings.MaxDocuments) { + return fmt.Errorf("collection is full: max documents %d reached", c.metadata.Settings.MaxDocuments) + } + } + + // Валидация схемы (если включена) + if c.metadata.Settings.ValidateSchema { + if err := c.validateDocument(doc); err != nil { + return fmt.Errorf("document validation failed: %v", err) + } + } + + // Проверка уникальных индексов + if err := c.checkUniqueConstraints(doc); err != nil { + return err + } + + // Обновляем временные метки документа + doc.CreatedAt = time.Now().UnixMilli() + doc.UpdatedAt = doc.CreatedAt + doc.DeletedAt = 0 + + // Сериализация для проверки (опционально) + data, err := serializer.Marshal(doc) + if err != nil { + return fmt.Errorf("failed to serialize document: %v", err) + } + + // Атомарное сохранение документа + if _, loaded := c.docs.LoadOrStore(doc.ID, doc); loaded { + return fmt.Errorf("document with id %s already exists", doc.ID) + } + + // Обновление индексов (wait-free) + c.updateIndexes(doc, true) + + // Обновление метаданных + c.docCount.Add(1) + c.sizeBytes.Add(int64(len(data))) + c.metadata.DocumentCount = c.docCount.Load() + c.metadata.SizeBytes = c.sizeBytes.Load() + c.metadata.UpdatedAt = time.Now().UnixMilli() + + // Аудит вставки документа + AuditDocumentOperation("INSERT", c.dbName, c.name, doc.ID, doc.GetFields()) + + return nil +} + +// InsertFromMap создаёт и вставляет документ из map +func (c *Collection) InsertFromMap(fields map[string]interface{}) error { + doc := NewDocument() + for k, v := range fields { + doc.SetField(k, v) + } + return c.Insert(doc) +} + +// Find находит документ по ID (с использованием первичного индекса) +func (c *Collection) Find(id string) (*Document, error) { + if val, ok := c.docs.Load(id); ok { + doc := val.(*Document) + + // Проверяем мягкое удаление + if doc.IsDeleted() && c.metadata.Settings.SoftDelete { + return nil, fmt.Errorf("document deleted at %s", doc.GetDeletedAtStr()) + } + + // Проверяем, не истёк ли TTL + if c.metadata.Settings.TTLSeconds > 0 { + if time.Now().UnixMilli()-doc.CreatedAt > int64(c.metadata.Settings.TTLSeconds*1000) { + if c.metadata.Settings.SoftDelete { + doc.SoftDelete() + c.docs.Store(id, doc) + } else { + c.Delete(id) + } + return nil, fmt.Errorf("key not found") + } + } + return doc, nil + } + return nil, fmt.Errorf("key not found") +} + +// FindIncludingDeleted находит документ даже если он мягко удалён +func (c *Collection) FindIncludingDeleted(id string) (*Document, error) { + if val, ok := c.docs.Load(id); ok { + return val.(*Document), nil + } + return nil, fmt.Errorf("key not found") +} + +// compareValues сравнивает два значения с учётом типа +func compareValues(a, b interface{}) bool { + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + + // Пробуем прямое сравнение + if a == b { + return true + } + + // Пробуем сравнение через строковое представление для разных типов + aStr := fmt.Sprintf("%v", a) + bStr := fmt.Sprintf("%v", b) + return aStr == bStr +} + +// FindByIndex находит документы по значению индексированного поля +func (c *Collection) FindByIndex(indexName string, value interface{}) ([]*Document, error) { + idxVal, ok := c.indexes.Load(indexName) + if !ok { + return nil, fmt.Errorf("index not found: %s", indexName) + } + + index := idxVal.(*Index) + docs := make([]*Document, 0) + + if index.Unique { + // Уникальный индекс возвращает один документ + if docID, ok := index.data.Load(value); ok { + if doc, err := c.Find(docID.(string)); err == nil { + docs = append(docs, doc) + } + } + } else { + // Для неуникального индекса используем Range с правильным сравнением + index.data.Range(func(key, val interface{}) bool { + if compareValues(key, value) { + if doc, err := c.Find(val.(string)); err == nil { + docs = append(docs, doc) + } + } + return true + }) + } + + return docs, nil +} + +// FindByIndexPrefix находит документы по префиксу индекса (для строковых полей) +func (c *Collection) FindByIndexPrefix(indexName string, prefix string) ([]*Document, error) { + idxVal, ok := c.indexes.Load(indexName) + if !ok { + return nil, fmt.Errorf("index not found: %s", indexName) + } + + index := idxVal.(*Index) + docs := make([]*Document, 0) + + index.data.Range(func(key, val interface{}) bool { + if keyStr, ok := key.(string); ok { + if strings.HasPrefix(keyStr, prefix) { + if doc, err := c.Find(val.(string)); err == nil { + docs = append(docs, doc) + } + } + } + return true + }) + + return docs, nil +} + +// Update обновляет документ по ID +func (c *Collection) Update(id string, updates map[string]interface{}) error { + val, ok := c.docs.Load(id) + if !ok { + return fmt.Errorf("key not found") + } + + oldDoc := val.(*Document) + + // Проверяем, не удалён ли документ мягко + if oldDoc.IsDeleted() && c.metadata.Settings.SoftDelete { + return fmt.Errorf("cannot update deleted document") + } + + // Создаём копию для проверки уникальности + newDoc := oldDoc.Clone() + if err := newDoc.Update(updates); err != nil { + return err + } + + // Обновляем временную метку + newDoc.UpdatedAt = time.Now().UnixMilli() + + // Проверяем ограничения + if err := c.constraints.ValidateDocument(newDoc); err != nil { + return fmt.Errorf("constraint violation: %v", err) + } + + // Проверяем уникальные индексы + if err := c.checkUniqueConstraintsUpdate(oldDoc, newDoc); err != nil { + return err + } + + // Сначала удаляем старые индексы, потом добавляем новые + c.removeFromIndexes(oldDoc) + c.addToIndexes(newDoc) + + // Сохраняем обновлённый документ + c.docs.Store(id, newDoc) + + c.metadata.UpdatedAt = time.Now().UnixMilli() + + // Аудит обновления документа + AuditDocumentOperation("UPDATE", c.dbName, c.name, id, updates) + + return nil +} + +// Delete удаляет документ по ID +func (c *Collection) Delete(id string) error { + val, ok := c.docs.Load(id) + if !ok { + return fmt.Errorf("key not found") + } + + doc := val.(*Document) + + if c.metadata.Settings.SoftDelete { + // Мягкое удаление + doc.SoftDelete() + c.docs.Store(id, doc) + + // Удаляем из индексов при мягком удалении + c.removeFromIndexes(doc) + + // Аудит мягкого удаления + AuditDocumentOperation("SOFT_DELETE", c.dbName, c.name, id, map[string]interface{}{ + "deleted_at": doc.DeletedAt, + }) + } else { + // Физическое удаление + // Удаляем из индексов + c.removeFromIndexes(doc) + + // Удаляем документ + c.docs.Delete(id) + + // Аудит физического удаления + AuditDocumentOperation("DELETE", c.dbName, c.name, id, nil) + } + + // Обновляем метаданные + c.docCount.Add(-1) + c.metadata.DocumentCount = c.docCount.Load() + c.metadata.UpdatedAt = time.Now().UnixMilli() + + return nil +} + +// PermanentDelete выполняет физическое удаление мягко удалённого документа +func (c *Collection) PermanentDelete(id string) error { + val, ok := c.docs.Load(id) + if !ok { + return fmt.Errorf("key not found") + } + + doc := val.(*Document) + + if !doc.IsDeleted() { + return fmt.Errorf("document is not soft deleted, use Delete() instead") + } + + // Удаляем из индексов + c.removeFromIndexes(doc) + + // Удаляем документ + c.docs.Delete(id) + + // Обновляем метаданные + c.docCount.Add(-1) + c.metadata.DocumentCount = c.docCount.Load() + c.metadata.UpdatedAt = time.Now().UnixMilli() + + // Аудит физического удаления + AuditDocumentOperation("PERMANENT_DELETE", c.dbName, c.name, id, nil) + + return nil +} + +// RestoreDeleted восстанавливает мягко удалённый документ +func (c *Collection) RestoreDeleted(id string) error { + val, ok := c.docs.Load(id) + if !ok { + return fmt.Errorf("key not found") + } + + doc := val.(*Document) + + if !doc.IsDeleted() { + return fmt.Errorf("document is not deleted") + } + + doc.Restore() + + // Восстанавливаем индексы + c.addToIndexes(doc) + + c.docs.Store(id, doc) + c.metadata.UpdatedAt = time.Now().UnixMilli() + + // Аудит восстановления + AuditDocumentOperation("RESTORE", c.dbName, c.name, id, nil) + + return nil +} + +// removeFromIndexes удаляет документ из всех индексов (wait-free) +func (c *Collection) removeFromIndexes(doc *Document) { + c.indexes.Range(func(key, value interface{}) bool { + index := value.(*Index) + indexValue := c.extractIndexValue(doc, index.Fields) + index.data.Delete(indexValue) + return true + }) +} + +// addToIndexes добавляет документ во все индексы (wait-free) +func (c *Collection) addToIndexes(doc *Document) { + c.indexes.Range(func(key, value interface{}) bool { + index := value.(*Index) + indexValue := c.extractIndexValue(doc, index.Fields) + if index.Unique { + index.data.LoadOrStore(indexValue, doc.ID) + } else { + index.data.Store(indexValue, doc.ID) + } + return true + }) +} + +// CreateIndex создаёт новый индекс на коллекции +func (c *Collection) CreateIndex(name string, fields []string, unique bool) error { + c.mu.Lock() + defer c.mu.Unlock() + + if _, exists := c.indexes.Load(name); exists { + return fmt.Errorf("index %s already exists", name) + } + + now := time.Now().UnixMilli() + index := &Index{ + Name: name, + Fields: fields, + Unique: unique, + CreatedAt: now, + } + + // Строим индекс на существующих документах (wait-free) + c.docs.Range(func(key, value interface{}) bool { + doc := value.(*Document) + // Пропускаем мягко удалённые документы + if doc.IsDeleted() && c.metadata.Settings.SoftDelete { + return true + } + indexValue := c.extractIndexValue(doc, fields) + if unique { + if _, loaded := index.data.LoadOrStore(indexValue, doc.ID); loaded { + // Найден дубликат - откатываем создание индекса + c.mu.Unlock() + return false + } + } else { + index.data.Store(indexValue, doc.ID) + } + return true + }) + + c.indexes.Store(name, index) + c.metadata.IndexCount++ + + // Аудит создания индекса + AuditCollectionOperation("CREATE_INDEX", c.dbName, c.name, map[string]interface{}{ + "index_name": name, + "fields": fields, + "unique": unique, + }) + + return nil +} + +// DropIndex удаляет индекс +func (c *Collection) DropIndex(name string) error { + if _, exists := c.indexes.LoadAndDelete(name); !exists { + return fmt.Errorf("index not found: %s", name) + } + c.metadata.IndexCount-- + + // Аудит удаления индекса + AuditCollectionOperation("DROP_INDEX", c.dbName, c.name, map[string]interface{}{ + "index_name": name, + }) + + return nil +} + +// GetIndexes возвращает список всех индексов +func (c *Collection) GetIndexes() []string { + names := make([]string, 0) + c.indexes.Range(func(key, value interface{}) bool { + names = append(names, key.(string)) + return true + }) + return names +} + +// GetIndexesInfo возвращает подробную информацию об индексах (для API) +func (c *Collection) GetIndexesInfo() []map[string]interface{} { + indexes := make([]map[string]interface{}, 0) + c.indexes.Range(func(key, value interface{}) bool { + idx := value.(*Index) + indexes = append(indexes, map[string]interface{}{ + "name": idx.Name, + "fields": idx.Fields, + "unique": idx.Unique, + "created_at": idx.CreatedAt, + }) + return true + }) + return indexes +} + +// extractIndexValue извлекает значение из документа для индексации +func (c *Collection) extractIndexValue(doc *Document, fields []string) interface{} { + if len(fields) == 1 { + val, _ := doc.GetField(fields[0]) + return val + } + + // Составной индекс - возвращаем строковое представление + parts := make([]string, 0, len(fields)) + for _, field := range fields { + if val, err := doc.GetField(field); err == nil { + parts = append(parts, fmt.Sprintf("%v", val)) + } else { + parts = append(parts, "NULL") + } + } + return strings.Join(parts, "|") +} + +// updateIndexes обновляет индексы для документа +func (c *Collection) updateIndexes(doc *Document, add bool) { + if add { + c.addToIndexes(doc) + } else { + c.removeFromIndexes(doc) + } +} + +// checkUniqueConstraints проверяет уникальные индексы перед вставкой +func (c *Collection) checkUniqueConstraints(doc *Document) error { + var errs []string + + c.indexes.Range(func(key, value interface{}) bool { + index := value.(*Index) + if index.Unique { + indexValue := c.extractIndexValue(doc, index.Fields) + if _, exists := index.data.Load(indexValue); exists { + errs = append(errs, fmt.Sprintf("duplicate key for index %s: %v", index.Name, indexValue)) + } + } + return true + }) + + if len(errs) > 0 { + return fmt.Errorf("%s", strings.Join(errs, "; ")) + } + return nil +} + +// checkUniqueConstraintsUpdate проверяет уникальность при обновлении +func (c *Collection) checkUniqueConstraintsUpdate(oldDoc, newDoc *Document) error { + var errs []string + + c.indexes.Range(func(key, value interface{}) bool { + index := value.(*Index) + if index.Unique { + oldValue := c.extractIndexValue(oldDoc, index.Fields) + newValue := c.extractIndexValue(newDoc, index.Fields) + + if fmt.Sprintf("%v", oldValue) != fmt.Sprintf("%v", newValue) { + if _, exists := index.data.Load(newValue); exists { + errs = append(errs, fmt.Sprintf("duplicate key for index %s: %v", index.Name, newValue)) + } + } + } + return true + }) + + if len(errs) > 0 { + return fmt.Errorf("%s", strings.Join(errs, "; ")) + } + return nil +} + +// validateDocument валидирует документ согласно схеме коллекции +func (c *Collection) validateDocument(doc *Document) error { + if doc.ID == "" { + return fmt.Errorf("document must have _id field") + } + return nil +} + +// ttlCleanupLoop периодически удаляет просроченные документы +func (c *Collection) ttlCleanupLoop() { + ticker := time.NewTicker(time.Duration(c.metadata.Settings.TTLSeconds/2) * time.Second) + defer ticker.Stop() + + for range ticker.C { + now := time.Now().UnixMilli() + toDelete := make([]string, 0) + + c.docs.Range(func(key, value interface{}) bool { + doc := value.(*Document) + if !doc.IsDeleted() && now-doc.CreatedAt > int64(c.metadata.Settings.TTLSeconds*1000) { + toDelete = append(toDelete, doc.ID) + } + return true + }) + + for _, id := range toDelete { + c.Delete(id) + } + } +} + +// Count возвращает количество активных документов в коллекции +func (c *Collection) Count() int64 { + if c.metadata.Settings.SoftDelete { + // Подсчитываем только не удалённые документы + count := int64(0) + c.docs.Range(func(key, value interface{}) bool { + doc := value.(*Document) + if !doc.IsDeleted() { + count++ + } + return true + }) + return count + } + return c.docCount.Load() +} + +// CountAll возвращает общее количество документов (включая мягко удалённые) +func (c *Collection) CountAll() int64 { + return c.docCount.Load() +} + +// CountDeleted возвращает количество мягко удалённых документов +func (c *Collection) CountDeleted() int64 { + if !c.metadata.Settings.SoftDelete { + return 0 + } + + count := int64(0) + c.docs.Range(func(key, value interface{}) bool { + doc := value.(*Document) + if doc.IsDeleted() { + count++ + } + return true + }) + return count +} + +// Size возвращает размер коллекции в байтах +func (c *Collection) Size() int64 { + return c.sizeBytes.Load() +} + +// GetAllDocuments возвращает все активные документы коллекции +func (c *Collection) GetAllDocuments() []*Document { + docs := make([]*Document, 0, c.docCount.Load()) + c.docs.Range(func(key, value interface{}) bool { + doc := value.(*Document) + if !c.metadata.Settings.SoftDelete || !doc.IsDeleted() { + docs = append(docs, doc) + } + return true + }) + return docs +} + +// GetAllDocumentsIncludingDeleted возвращает все документы (включая мягко удалённые) +func (c *Collection) GetAllDocumentsIncludingDeleted() []*Document { + docs := make([]*Document, 0, c.docCount.Load()) + c.docs.Range(func(key, value interface{}) bool { + docs = append(docs, value.(*Document)) + return true + }) + return docs +} + +// FindByFilter находит документы по произвольному фильтру +func (c *Collection) FindByFilter(filter func(*Document) bool) []*Document { + results := make([]*Document, 0) + c.docs.Range(func(key, value interface{}) bool { + doc := value.(*Document) + if (!c.metadata.Settings.SoftDelete || !doc.IsDeleted()) && filter(doc) { + results = append(results, doc) + } + return true + }) + return results +} + +// GetMetadata возвращает метаданные коллекции +func (c *Collection) GetMetadata() *CollectionMetadata { + c.mu.RLock() + defer c.mu.RUnlock() + return c.metadata +} + +// GetTimestamps возвращает временные метки коллекции +func (c *Collection) GetTimestamps() map[string]int64 { + c.mu.RLock() + defer c.mu.RUnlock() + return map[string]int64{ + "created_at": c.metadata.CreatedAt, + "updated_at": c.metadata.UpdatedAt, + "deleted_at": c.metadata.DeletedAt, + } +} + +// Drop удаляет все документы из коллекции +func (c *Collection) Drop() error { + c.mu.Lock() + defer c.mu.Unlock() + + c.docs = sync.Map{} + c.indexes = sync.Map{} + + if c.metadata.Settings.AutoIndexID { + c.CreateIndex("_id_", []string{"_id"}, true) + } + + c.docCount.Store(0) + c.sizeBytes.Store(0) + c.metadata.DocumentCount = 0 + c.metadata.SizeBytes = 0 + c.metadata.UpdatedAt = time.Now().UnixMilli() + + // Аудит удаления коллекции + AuditCollectionOperation("DROP", c.dbName, c.name, nil) + + return nil +} + +// ========== Constraints Methods ========== + +// AddRequiredField добавляет обязательное поле +func (c *Collection) AddRequiredField(field string) { + c.constraints.mu.Lock() + defer c.constraints.mu.Unlock() + c.constraints.RequiredFields[field] = true +} + +// AddUniqueConstraint добавляет ограничение уникальности +func (c *Collection) AddUniqueConstraint(field string) { + c.constraints.mu.Lock() + defer c.constraints.mu.Unlock() + c.constraints.UniqueFields[field] = true + // Также создаём уникальный индекс + c.CreateIndex("unique_"+field, []string{field}, true) +} + +// AddMinConstraint добавляет минимальное значение +func (c *Collection) AddMinConstraint(field string, min float64) { + c.constraints.mu.Lock() + defer c.constraints.mu.Unlock() + c.constraints.MinValues[field] = min +} + +// AddMaxConstraint добавляет максимальное значение +func (c *Collection) AddMaxConstraint(field string, max float64) { + c.constraints.mu.Lock() + defer c.constraints.mu.Unlock() + c.constraints.MaxValues[field] = max +} + +// AddRegexConstraint добавляет regexp паттерн +func (c *Collection) AddRegexConstraint(field string, pattern string) { + c.constraints.mu.Lock() + defer c.constraints.mu.Unlock() + c.constraints.PatternFields[field] = pattern +} + +// AddEnumConstraint добавляет допустимые значения +func (c *Collection) AddEnumConstraint(field string, values []interface{}) { + c.constraints.mu.Lock() + defer c.constraints.mu.Unlock() + c.constraints.EnumFields[field] = values +} + +// RemoveRequiredField удаляет обязательное поле +func (c *Collection) RemoveRequiredField(field string) { + c.constraints.mu.Lock() + defer c.constraints.mu.Unlock() + delete(c.constraints.RequiredFields, field) +} + +// RemoveUniqueConstraint удаляет ограничение уникальности +func (c *Collection) RemoveUniqueConstraint(field string) { + c.constraints.mu.Lock() + defer c.constraints.mu.Unlock() + delete(c.constraints.UniqueFields, field) + // Также удаляем индекс + c.DropIndex("unique_" + field) +} + +// RemoveMinConstraint удаляет минимальное значение +func (c *Collection) RemoveMinConstraint(field string) { + c.constraints.mu.Lock() + defer c.constraints.mu.Unlock() + delete(c.constraints.MinValues, field) +} + +// RemoveMaxConstraint удаляет максимальное значение +func (c *Collection) RemoveMaxConstraint(field string) { + c.constraints.mu.Lock() + defer c.constraints.mu.Unlock() + delete(c.constraints.MaxValues, field) +} + +// RemoveRegexConstraint удаляет regexp паттерн +func (c *Collection) RemoveRegexConstraint(field string) { + c.constraints.mu.Lock() + defer c.constraints.mu.Unlock() + delete(c.constraints.PatternFields, field) +} + +// RemoveEnumConstraint удаляет допустимые значения +func (c *Collection) RemoveEnumConstraint(field string) { + c.constraints.mu.Lock() + defer c.constraints.mu.Unlock() + delete(c.constraints.EnumFields, field) +} + +// GetRequiredFields возвращает список обязательных полей +func (c *Collection) GetRequiredFields() []string { + c.constraints.mu.RLock() + defer c.constraints.mu.RUnlock() + + fields := make([]string, 0, len(c.constraints.RequiredFields)) + for field := range c.constraints.RequiredFields { + fields = append(fields, field) + } + return fields +} + +// GetUniqueConstraints возвращает список уникальных полей +func (c *Collection) GetUniqueConstraints() []string { + c.constraints.mu.RLock() + defer c.constraints.mu.RUnlock() + + fields := make([]string, 0, len(c.constraints.UniqueFields)) + for field := range c.constraints.UniqueFields { + fields = append(fields, field) + } + return fields +} + +// GetMinConstraints возвращает карту минимальных значений +func (c *Collection) GetMinConstraints() map[string]float64 { + c.constraints.mu.RLock() + defer c.constraints.mu.RUnlock() + + result := make(map[string]float64) + for field, val := range c.constraints.MinValues { + result[field] = val + } + return result +} + +// GetMaxConstraints возвращает карту максимальных значений +func (c *Collection) GetMaxConstraints() map[string]float64 { + c.constraints.mu.RLock() + defer c.constraints.mu.RUnlock() + + result := make(map[string]float64) + for field, val := range c.constraints.MaxValues { + result[field] = val + } + return result +} + +// GetEnumConstraints возвращает карту enum ограничений +func (c *Collection) GetEnumConstraints() map[string][]interface{} { + c.constraints.mu.RLock() + defer c.constraints.mu.RUnlock() + + result := make(map[string][]interface{}) + for field, vals := range c.constraints.EnumFields { + copied := make([]interface{}, len(vals)) + copy(copied, vals) + result[field] = copied + } + return result +} + +// GetRegexConstraints возвращает карту regex паттернов +func (c *Collection) GetRegexConstraints() map[string]string { + c.constraints.mu.RLock() + defer c.constraints.mu.RUnlock() + + result := make(map[string]string) + for field, pattern := range c.constraints.PatternFields { + result[field] = pattern + } + return result +} + +// GetConstraints возвращает все ограничения коллекции (для API) +func (c *Collection) GetConstraints() map[string]interface{} { + return map[string]interface{}{ + "required_fields": c.GetRequiredFields(), + "unique_fields": c.GetUniqueConstraints(), + "min_values": c.GetMinConstraints(), + "max_values": c.GetMaxConstraints(), + "enum_values": c.GetEnumConstraints(), + "regex_patterns": c.GetRegexConstraints(), + } +} + +// ValidateDocument проверяет документ на соответствие ограничениям +func (cons *Constraints) ValidateDocument(doc *Document) error { + cons.mu.RLock() + defer cons.mu.RUnlock() + + // Проверка обязательных полей + for field := range cons.RequiredFields { + if !doc.HasField(field) { + return fmt.Errorf("required field '%s' is missing", field) + } + } + + // Проверка числовых ограничений + for field, minVal := range cons.MinValues { + if val, err := doc.GetField(field); err == nil { + if numVal, ok := toFloat64(val); ok { + if numVal < minVal { + return fmt.Errorf("field '%s' value %v is less than minimum %v", field, numVal, minVal) + } + } + } + } + + for field, maxVal := range cons.MaxValues { + if val, err := doc.GetField(field); err == nil { + if numVal, ok := toFloat64(val); ok { + if numVal > maxVal { + return fmt.Errorf("field '%s' value %v exceeds maximum %v", field, numVal, maxVal) + } + } + } + } + + // Проверка regex паттернов + for field, pattern := range cons.PatternFields { + if val, err := doc.GetField(field); err == nil { + if strVal, ok := val.(string); ok { + // Простая проверка regex (можно заменить на regexp.MustCompile) + if pattern != "" && !strings.Contains(strVal, pattern) { + return fmt.Errorf("field '%s' value '%s' does not match pattern '%s'", field, strVal, pattern) + } + } + } + } + + // Проверка enum + for field, allowedValues := range cons.EnumFields { + if val, err := doc.GetField(field); err == nil { + found := false + for _, allowed := range allowedValues { + if fmt.Sprintf("%v", val) == fmt.Sprintf("%v", allowed) { + found = true + break + } + } + if !found { + return fmt.Errorf("field '%s' value '%v' not in allowed list", field, val) + } + } + } + + return nil +} + +// ========== ACL Methods ========== + +// SetACL устанавливает ACL для коллекции +func (c *Collection) SetACL(role string, canRead, canWrite, canDelete, isAdmin bool) { + c.acl.mu.Lock() + defer c.acl.mu.Unlock() + + c.acl.UpdatedAt = time.Now().UnixMilli() + + if canRead { + c.acl.ReadRoles[role] = true + } + if canWrite { + c.acl.WriteRoles[role] = true + } + if canDelete { + c.acl.DeleteRoles[role] = true + } + if isAdmin { + c.acl.AdminRoles[role] = true + } +} + +// CheckPermission проверяет наличие разрешения у роли +func (c *Collection) CheckPermission(role, operation string) bool { + c.acl.mu.RLock() + defer c.acl.mu.RUnlock() + + // Администратор имеет все права + if c.acl.AdminRoles[role] { + return true + } + + switch operation { + case "read": + return c.acl.ReadRoles[role] + case "write": + return c.acl.WriteRoles[role] + case "delete": + return c.acl.DeleteRoles[role] + default: + return false + } +} + +// GetACLUpdatedAt возвращает время последнего обновления ACL +func (c *Collection) GetACLUpdatedAt() int64 { + c.acl.mu.RLock() + defer c.acl.mu.RUnlock() + return c.acl.UpdatedAt +} + +// toFloat64 конвертирует interface{} в float64 +func toFloat64(val interface{}) (float64, bool) { + switch v := val.(type) { + case int: + return float64(v), true + case int64: + return float64(v), true + case float64: + return v, true + case float32: + return float64(v), true + default: + return 0, false + } +} diff --git a/internal/storage/document.go b/internal/storage/document.go new file mode 100644 index 0000000..56ccff9 --- /dev/null +++ b/internal/storage/document.go @@ -0,0 +1,584 @@ +/* + * Copyright 2026 Safronov Grigorii + * + * Licensed under the CDDL, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * https://opensource.org/licenses/CDDL-1.0 + */ + +// Файл: internal/storage/document.go +// Назначение: Определение структуры документа, его методов для работы +// с полями, кортежами (вложенными документами) и сериализации в MessagePack. +// Документ является основной единицей хранения в СУБД futriis. + +package storage + +import ( + "fmt" + "strings" + "sync" + "time" + + "futriis/internal/compression" + "futriis/internal/serializer" + "github.com/google/uuid" +) + +// Document представляет документ в коллекции (аналог строки в реляционной СУБД) +type Document struct { + ID string `msgpack:"_id"` // Уникальный идентификатор документа + Fields map[string]interface{} `msgpack:"fields"` // Поля документа (аналог колонок) + CreatedAt int64 `msgpack:"created_at"` // Время создания (Unix миллисекунды) + UpdatedAt int64 `msgpack:"updated_at"` // Время последнего обновления + DeletedAt int64 `msgpack:"deleted_at"` // Время удаления (Unix миллисекунды, 0 = не удалён) + Version uint64 `msgpack:"version"` // Версия документа (для оптимистичных блокировок) + Compressed bool `msgpack:"compressed"` // Флаг, сжат ли документ + OriginalSize int64 `msgpack:"original_size"` // Оригинальный размер до сжатия + mu sync.RWMutex `msgpack:"-"` // Блокировка для wait-free операций +} + +// Tuple представляет вложенный документ (аналог кортежа в реляционной СУБД) +type Tuple struct { + Fields map[string]interface{} `msgpack:"fields"` + CreatedAt int64 `msgpack:"created_at"` // Время создания кортежа + UpdatedAt int64 `msgpack:"updated_at"` // Время последнего обновления кортежа + mu sync.RWMutex +} + +// Field представляет отдельное поле документа (аналог колонки) +type Field struct { + Name string `msgpack:"name"` + Type FieldType `msgpack:"type"` + Value interface{} `msgpack:"value"` + UpdatedAt int64 `msgpack:"updated_at"` // Время последнего обновления поля +} + +// FieldType определяет тип поля документа +type FieldType int + +const ( + TypeString FieldType = iota + TypeNumber + TypeBoolean + TypeTuple // Вложенный документ + TypeArray + TypeNull +) + +// NewDocument создаёт новый документ с автоматической генерацией ID +func NewDocument() *Document { + now := time.Now().UnixMilli() + return &Document{ + ID: uuid.New().String(), + Fields: make(map[string]interface{}), + CreatedAt: now, + UpdatedAt: now, + DeletedAt: 0, + Version: 1, + Compressed: false, + OriginalSize: 0, + } +} + +// NewDocumentWithID создаёт документ с указанным ID +func NewDocumentWithID(id string) *Document { + now := time.Now().UnixMilli() + return &Document{ + ID: id, + Fields: make(map[string]interface{}), + CreatedAt: now, + UpdatedAt: now, + DeletedAt: 0, + Version: 1, + Compressed: false, + OriginalSize: 0, + } +} + +// SetField устанавливает значение поля документа (wait-free) +func (d *Document) SetField(name string, value interface{}) { + d.mu.Lock() + defer d.mu.Unlock() + + d.Fields[name] = value + d.UpdatedAt = time.Now().UnixMilli() + d.Version++ + d.Compressed = false // При изменении документа снимаем флаг сжатия + + // Аудит изменения поля + AuditFieldOperation("UPDATE", "", "", d.ID, name, value) +} + +// GetField возвращает значение поля документа +func (d *Document) GetField(name string) (interface{}, error) { + d.mu.RLock() + defer d.mu.RUnlock() + + if val, ok := d.Fields[name]; ok { + return val, nil + } + return nil, fmt.Errorf("field not found: %s", name) +} + +// DeleteField удаляет поле из документа +func (d *Document) DeleteField(name string) { + d.mu.Lock() + defer d.mu.Unlock() + + delete(d.Fields, name) + d.UpdatedAt = time.Now().UnixMilli() + d.Version++ + d.Compressed = false + + // Аудит удаления поля + AuditFieldOperation("DELETE", "", "", d.ID, name, nil) +} + +// HasField проверяет наличие поля в документе +func (d *Document) HasField(name string) bool { + d.mu.RLock() + defer d.mu.RUnlock() + + _, ok := d.Fields[name] + return ok +} + +// GetFields возвращает копию всех полей документа +func (d *Document) GetFields() map[string]interface{} { + d.mu.RLock() + defer d.mu.RUnlock() + + copy := make(map[string]interface{}) + for k, v := range d.Fields { + copy[k] = v + } + return copy +} + +// SetTuple устанавливает вложенный документ (кортеж) в поле +func (d *Document) SetTuple(fieldName string, tuple *Tuple) { + d.SetField(fieldName, tuple) +} + +// GetTuple возвращает вложенный документ из поля +func (d *Document) GetTuple(fieldName string) (*Tuple, error) { + val, err := d.GetField(fieldName) + if err != nil { + return nil, err + } + + if tuple, ok := val.(*Tuple); ok { + return tuple, nil + } + return nil, fmt.Errorf("field %s is not a tuple", fieldName) +} + +// Serialize сериализует документ в MessagePack с поддержкой сжатия +func (d *Document) Serialize() ([]byte, error) { + d.mu.RLock() + defer d.mu.RUnlock() + + data, err := serializer.Marshal(d) + if err != nil { + return nil, err + } + + return data, nil +} + +// SerializeCompressed сериализует и сжимает документ +func (d *Document) SerializeCompressed(compressionConfig *compression.Config) ([]byte, error) { + d.mu.RLock() + defer d.mu.RUnlock() + + // Сериализуем документ + data, err := serializer.Marshal(d) + if err != nil { + return nil, err + } + + // Проверяем, нужно ли сжимать + if compressionConfig != nil && compressionConfig.Enabled && len(data) >= compressionConfig.MinSize { + compressed, err := compression.Compress(data, compressionConfig) + if err != nil { + // При ошибке сжатия возвращаем несжатые данные + return data, nil + } + return compressed, nil + } + + return data, nil +} + +// Deserialize десериализует документ из MessagePack (автоматически определяет сжатие) +func (d *Document) Deserialize(data []byte) error { + d.mu.Lock() + defer d.mu.Unlock() + + // Пытаемся определить, сжаты ли данные + // Для этого пробуем распаковать, если не получается - данные несжатые + decompressed, err := compression.DecompressAuto(data) + if err == nil && len(decompressed) < len(data) { + // Данные были сжаты, используем распакованную версию + if err := serializer.Unmarshal(decompressed, d); err != nil { + return err + } + d.Compressed = true + d.OriginalSize = int64(len(decompressed)) + } else { + // Данные не сжаты или не удалось распаковать + if err := serializer.Unmarshal(data, d); err != nil { + return err + } + d.Compressed = false + d.OriginalSize = 0 + } + + // Обновляем временные метки при десериализации + d.UpdatedAt = time.Now().UnixMilli() + return nil +} + +// Clone создаёт глубокую копию документа +func (d *Document) Clone() *Document { + d.mu.RLock() + defer d.mu.RUnlock() + + clone := &Document{ + ID: d.ID, + Fields: make(map[string]interface{}), + CreatedAt: d.CreatedAt, + UpdatedAt: d.UpdatedAt, + DeletedAt: d.DeletedAt, + Version: d.Version, + Compressed: d.Compressed, + OriginalSize: d.OriginalSize, + } + + // Глубокое копирование полей + for k, v := range d.Fields { + clone.Fields[k] = deepCopyValue(v) + } + + return clone +} + +// Update применяет обновление к документу (атомарно) +func (d *Document) Update(updates map[string]interface{}) error { + d.mu.Lock() + defer d.mu.Unlock() + + for k, v := range updates { + d.Fields[k] = v + } + d.UpdatedAt = time.Now().UnixMilli() + d.Version++ + d.Compressed = false // После обновления документ больше не сжат + + return nil +} + +// SoftDelete мягко удаляет документ (устанавливает метку времени удаления) +func (d *Document) SoftDelete() { + d.mu.Lock() + defer d.mu.Unlock() + + d.DeletedAt = time.Now().UnixMilli() + d.UpdatedAt = d.DeletedAt + d.Version++ +} + +// IsDeleted проверяет, удалён ли документ (мягкое удаление) +func (d *Document) IsDeleted() bool { + d.mu.RLock() + defer d.mu.RUnlock() + return d.DeletedAt > 0 +} + +// Restore восстанавливает мягко удалённый документ +func (d *Document) Restore() { + d.mu.Lock() + defer d.mu.Unlock() + + d.DeletedAt = 0 + d.UpdatedAt = time.Now().UnixMilli() + d.Version++ +} + +// GetDeletedAtStr возвращает человекочитаемую строку времени удаления +func (d *Document) GetDeletedAtStr() string { + d.mu.RLock() + defer d.mu.RUnlock() + if d.DeletedAt == 0 { + return "" + } + return time.UnixMilli(d.DeletedAt).Format("2006-01-02 15:04:05.000") +} + +// GetCreatedAtStr возвращает человекочитаемую строку времени создания +func (d *Document) GetCreatedAtStr() string { + d.mu.RLock() + defer d.mu.RUnlock() + return time.UnixMilli(d.CreatedAt).Format("2006-01-02 15:04:05.000") +} + +// GetUpdatedAtStr возвращает человекочитаемую строку времени обновления +func (d *Document) GetUpdatedAtStr() string { + d.mu.RLock() + defer d.mu.RUnlock() + return time.UnixMilli(d.UpdatedAt).Format("2006-01-02 15:04:05.000") +} + +// Compress сжимает документ в памяти +func (d *Document) Compress(config *compression.Config) error { + d.mu.Lock() + defer d.mu.Unlock() + + if d.Compressed { + return nil + } + + // Сохраняем текущее состояние + originalSize := len(d.Fields) + if originalSize < config.MinSize { + return nil // Не сжимаем маленькие документы + } + + d.Compressed = true + d.OriginalSize = int64(originalSize) + + return nil +} + +// Decompress распаковывает документ в памяти +func (d *Document) Decompress() error { + d.mu.Lock() + defer d.mu.Unlock() + + if !d.Compressed { + return nil + } + + d.Compressed = false + d.OriginalSize = 0 + + return nil +} + +// GetCompressionRatio возвращает коэффициент сжатия +func (d *Document) GetCompressionRatio() float64 { + d.mu.RLock() + defer d.mu.RUnlock() + + if !d.Compressed || d.OriginalSize == 0 { + return 1.0 + } + + currentSize := len(d.Fields) + return float64(currentSize) / float64(d.OriginalSize) +} + +// GetMetadata возвращает метаданные документа (временные метки) +func (d *Document) GetMetadata() map[string]int64 { + d.mu.RLock() + defer d.mu.RUnlock() + + return map[string]int64{ + "created_at": d.CreatedAt, + "updated_at": d.UpdatedAt, + "deleted_at": d.DeletedAt, + "version": int64(d.Version), + } +} + +// deepCopyValue выполняет глубокое копирование значения +func deepCopyValue(val interface{}) interface{} { + switch v := val.(type) { + case *Tuple: + return v.Clone() + case map[string]interface{}: + copy := make(map[string]interface{}) + for k, val := range v { + copy[k] = deepCopyValue(val) + } + return copy + case []interface{}: + copy := make([]interface{}, len(v)) + for i, val := range v { + copy[i] = deepCopyValue(val) + } + return copy + default: + return v + } +} + +// NewTuple создаёт новый вложенный документ (кортеж) +func NewTuple() *Tuple { + now := time.Now().UnixMilli() + return &Tuple{ + Fields: make(map[string]interface{}), + CreatedAt: now, + UpdatedAt: now, + } +} + +// Set устанавливает поле во вложенном документе +func (t *Tuple) Set(name string, value interface{}) { + t.mu.Lock() + defer t.mu.Unlock() + t.Fields[name] = value + t.UpdatedAt = time.Now().UnixMilli() +} + +// Get возвращает поле из вложенного документа +func (t *Tuple) Get(name string) (interface{}, error) { + t.mu.RLock() + defer t.mu.RUnlock() + + if val, ok := t.Fields[name]; ok { + return val, nil + } + return nil, fmt.Errorf("tuple field not found: %s", name) +} + +// Clone создаёт копию кортежа +func (t *Tuple) Clone() *Tuple { + t.mu.RLock() + defer t.mu.RUnlock() + + clone := NewTuple() + for k, v := range t.Fields { + clone.Fields[k] = deepCopyValue(v) + } + clone.CreatedAt = t.CreatedAt + clone.UpdatedAt = t.UpdatedAt + return clone +} + +// ToMap конвертирует кортеж в map +func (t *Tuple) ToMap() map[string]interface{} { + t.mu.RLock() + defer t.mu.RUnlock() + + copy := make(map[string]interface{}) + for k, v := range t.Fields { + copy[k] = v + } + return copy +} + +// GetTupleMetadata возвращает метаданные кортежа +func (t *Tuple) GetTupleMetadata() map[string]int64 { + t.mu.RLock() + defer t.mu.RUnlock() + + return map[string]int64{ + "created_at": t.CreatedAt, + "updated_at": t.UpdatedAt, + } +} + +// GetNestedField получает значение по точечному пути (например, "user.address.city") +func (d *Document) GetNestedField(path string) (interface{}, error) { + parts := strings.Split(path, ".") + if len(parts) == 0 { + return nil, fmt.Errorf("empty path") + } + + current := interface{}(d) + for _, part := range parts { + switch v := current.(type) { + case *Document: + val, err := v.GetField(part) + if err != nil { + return nil, err + } + current = val + case *Tuple: + val, err := v.Get(part) + if err != nil { + return nil, err + } + current = val + case map[string]interface{}: + if val, ok := v[part]; ok { + current = val + } else { + return nil, fmt.Errorf("field not found: %s", part) + } + default: + return nil, fmt.Errorf("cannot navigate into non-document value at %s", part) + } + } + + return current, nil +} + +// SetNestedField устанавливает значение по точечному пути +func (d *Document) SetNestedField(path string, value interface{}) error { + parts := strings.Split(path, ".") + if len(parts) == 0 { + return fmt.Errorf("empty path") + } + + // Если путь состоит из одного элемента, просто устанавливаем поле + if len(parts) == 1 { + d.SetField(parts[0], value) + return nil + } + + // Иначе нужно создать промежуточные структуры + current := interface{}(d) + for i := 0; i < len(parts)-1; i++ { + part := parts[i] + + switch v := current.(type) { + case *Document: + if !v.HasField(part) { + // Создаём новый кортеж, если поле не существует + newTuple := NewTuple() + v.SetField(part, newTuple) + current = newTuple + } else { + field, _ := v.GetField(part) + if tuple, ok := field.(*Tuple); ok { + current = tuple + } else { + return fmt.Errorf("field %s is not a tuple", part) + } + } + case *Tuple: + if val, err := v.Get(part); err == nil { + if tuple, ok := val.(*Tuple); ok { + current = tuple + } else { + return fmt.Errorf("field %s is not a tuple", part) + } + } else { + newTuple := NewTuple() + v.Set(part, newTuple) + current = newTuple + } + default: + return fmt.Errorf("cannot set nested field on non-document value") + } + } + + // Устанавливаем значение в последний элемент пути + lastPart := parts[len(parts)-1] + switch v := current.(type) { + case *Document: + v.SetField(lastPart, value) + case *Tuple: + v.Set(lastPart, value) + default: + return fmt.Errorf("cannot set field on non-document value") + } + + d.UpdatedAt = time.Now().UnixMilli() + d.Compressed = false + return nil +} diff --git a/internal/storage/engine.go b/internal/storage/engine.go new file mode 100644 index 0000000..f4fce8c --- /dev/null +++ b/internal/storage/engine.go @@ -0,0 +1,574 @@ +/* + * Copyright 2026 Safronov Grigorii + * + * Licensed under the CDDL, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * https://opensource.org/licenses/CDDL-1.0 + */ + +// Файл: internal/storage/engine.go +// Назначение: In-memory движок хранения документов с поддержкой коллекций, +// слайсов (аналог БД), тапплов (аналог таблиц), полей и кортежей. +// Полностью wait-free с использованием sync.Map и атомарных операций. + +package storage + +import ( + "fmt" + "os" + "sync" + "sync/atomic" + "time" + + "futriis/internal/log" + "futriis/internal/serializer" +) + +// Storage представляет основное хранилище баз данных +type Storage struct { + databases sync.Map // map[string]*Database + pageSize int64 + logger *log.Logger + totalDocs atomic.Int64 + createdAt int64 +} + +// Database представляет базу данных (аналог слайса в реляционных СУБД) +type Database struct { + name string + collections sync.Map // map[string]*Collection + createdAt int64 + updatedAt int64 + deletedAt int64 + mu sync.RWMutex +} + +// NewStorage создаёт новый экземпляр хранилища +func NewStorage(pageSizeMB int, logger *log.Logger) *Storage { + return &Storage{ + pageSize: int64(pageSizeMB) * 1024 * 1024, + logger: logger, + createdAt: time.Now().UnixMilli(), + } +} + +// CreateDatabase создаёт новую базу данных +func (s *Storage) CreateDatabase(name string) error { + now := time.Now().UnixMilli() + db := &Database{ + name: name, + createdAt: now, + updatedAt: now, + deletedAt: 0, + } + + if _, exists := s.databases.LoadOrStore(name, db); exists { + return fmt.Errorf("database already exists") + } + AuditDatabaseOperation("CREATE", name) + if s.logger != nil { + s.logger.Info("Database created: " + name) + } + return nil +} + +// GetDatabase возвращает базу данных по имени +func (s *Storage) GetDatabase(name string) (*Database, error) { + if val, ok := s.databases.Load(name); ok { + return val.(*Database), nil + } + return nil, fmt.Errorf("database not found") +} + +// DropDatabase удаляет базу данных +func (s *Storage) DropDatabase(name string) error { + if val, ok := s.databases.Load(name); ok { + db := val.(*Database) + db.mu.Lock() + db.deletedAt = time.Now().UnixMilli() + db.mu.Unlock() + } + + if _, ok := s.databases.LoadAndDelete(name); !ok { + return fmt.Errorf("database not found") + } + AuditDatabaseOperation("DROP", name) + if s.logger != nil { + s.logger.Info("Database dropped: " + name) + } + return nil +} + +// ListDatabases возвращает список всех баз данных +func (s *Storage) ListDatabases() []string { + databases := make([]string, 0) + s.databases.Range(func(key, value interface{}) bool { + databases = append(databases, key.(string)) + return true + }) + return databases +} + +// Name возвращает имя базы данных +func (db *Database) Name() string { + return db.name +} + +// GetCreatedAt возвращает время создания базы данных +func (db *Database) GetCreatedAt() int64 { + db.mu.RLock() + defer db.mu.RUnlock() + return db.createdAt +} + +// GetUpdatedAt возвращает время последнего обновления базы данных +func (db *Database) GetUpdatedAt() int64 { + db.mu.RLock() + defer db.mu.RUnlock() + return db.updatedAt +} + +// GetDeletedAt возвращает время удаления базы данных (0 = не удалена) +func (db *Database) GetDeletedAt() int64 { + db.mu.RLock() + defer db.mu.RUnlock() + return db.deletedAt +} + +// UpdateTimestamp обновляет временную метку базы данных +func (db *Database) UpdateTimestamp() { + db.mu.Lock() + defer db.mu.Unlock() + db.updatedAt = time.Now().UnixMilli() +} + +// CreateCollection создаёт новую коллекцию в базе данных +func (db *Database) CreateCollection(name string) error { + if _, exists := db.collections.LoadOrStore(name, NewCollection(db.name, name, nil)); exists { + return fmt.Errorf("collection already exists") + } + db.UpdateTimestamp() + AuditCollectionOperation("CREATE", db.name, name, nil) + return nil +} + +// CreateCollectionWithSettings создаёт коллекцию с настройками +func (db *Database) CreateCollectionWithSettings(name string, settings *CollectionSettings) error { + if _, exists := db.collections.LoadOrStore(name, NewCollection(db.name, name, settings)); exists { + return fmt.Errorf("collection already exists") + } + db.UpdateTimestamp() + AuditCollectionOperation("CREATE", db.name, name, settings) + return nil +} + +// GetCollection возвращает коллекцию по имени +func (db *Database) GetCollection(name string) (*Collection, error) { + if val, ok := db.collections.Load(name); ok { + return val.(*Collection), nil + } + return nil, fmt.Errorf("collection not found") +} + +// DropCollection удаляет коллекцию +func (db *Database) DropCollection(name string) error { + if _, ok := db.collections.LoadAndDelete(name); !ok { + return fmt.Errorf("collection not found") + } + db.UpdateTimestamp() + AuditCollectionOperation("DROP", db.name, name, nil) + return nil +} + +// ListCollections возвращает список всех коллекций в базе данных +func (db *Database) ListCollections() []string { + collections := make([]string, 0) + db.collections.Range(func(key, value interface{}) bool { + collections = append(collections, key.(string)) + return true + }) + return collections +} + +// GetTotalDocuments возвращает общее количество документов во всех коллекциях +func (s *Storage) GetTotalDocuments() int64 { + return s.totalDocs.Load() +} + +// GetPageSize возвращает размер страницы памяти +func (s *Storage) GetPageSize() int64 { + return s.pageSize +} + +// GetCreatedAt возвращает время создания хранилища +func (s *Storage) GetCreatedAt() int64 { + return s.createdAt +} + +// SerializeDatabase сериализует всю базу данных в MessagePack +func (db *Database) SerializeDatabase() ([]byte, error) { + dbData := make(map[string]interface{}) + + db.collections.Range(func(key, value interface{}) bool { + coll := value.(*Collection) + collData := make(map[string]interface{}) + + docs := coll.GetAllDocuments() + collDocs := make([]*Document, 0, len(docs)) + for _, doc := range docs { + collDocs = append(collDocs, doc) + } + collData["documents"] = collDocs + collData["metadata"] = coll.GetMetadata() + collData["timestamps"] = coll.GetTimestamps() + + dbData[key.(string)] = collData + return true + }) + + // Добавляем метаданные базы данных + dbData["_metadata"] = map[string]interface{}{ + "name": db.name, + "created_at": db.GetCreatedAt(), + "updated_at": db.GetUpdatedAt(), + "exported_at": time.Now().UnixMilli(), + } + + return serializer.Marshal(dbData) +} + +// DeserializeDatabase десериализует базу данных из MessagePack +func (db *Database) DeserializeDatabase(data []byte) error { + var dbData map[string]interface{} + if err := serializer.Unmarshal(data, &dbData); err != nil { + return err + } + + // Извлекаем метаданные + if metaRaw, ok := dbData["_metadata"]; ok { + if meta, ok := metaRaw.(map[string]interface{}); ok { + if createdAt, ok := meta["created_at"]; ok { + if v, ok := createdAt.(int64); ok { + db.createdAt = v + } + } + } + delete(dbData, "_metadata") + } + + for collName, collDataRaw := range dbData { + collData, ok := collDataRaw.(map[string]interface{}) + if !ok { + continue + } + + settings := &CollectionSettings{ + MaxDocuments: 0, + ValidateSchema: false, + AutoIndexID: true, + TTLSeconds: 0, + SoftDelete: false, + } + + if metaRaw, ok := collData["metadata"]; ok { + if meta, ok := metaRaw.(map[string]interface{}); ok { + if settingsRaw, ok := meta["settings"]; ok { + if settingsMap, ok := settingsRaw.(map[string]interface{}); ok { + if maxDocs, ok := settingsMap["max_documents"]; ok { + if v, ok := maxDocs.(int); ok { + settings.MaxDocuments = v + } else if v, ok := maxDocs.(int64); ok { + settings.MaxDocuments = int(v) + } else if v, ok := maxDocs.(float64); ok { + settings.MaxDocuments = int(v) + } + } + if validateSchema, ok := settingsMap["validate_schema"]; ok { + if v, ok := validateSchema.(bool); ok { + settings.ValidateSchema = v + } + } + if autoIndexID, ok := settingsMap["auto_index_id"]; ok { + if v, ok := autoIndexID.(bool); ok { + settings.AutoIndexID = v + } + } + if ttlSeconds, ok := settingsMap["ttl_seconds"]; ok { + if v, ok := ttlSeconds.(int); ok { + settings.TTLSeconds = v + } else if v, ok := ttlSeconds.(int64); ok { + settings.TTLSeconds = int(v) + } else if v, ok := ttlSeconds.(float64); ok { + settings.TTLSeconds = int(v) + } + } + if softDelete, ok := settingsMap["soft_delete"]; ok { + if v, ok := softDelete.(bool); ok { + settings.SoftDelete = v + } + } + } + } + } else if meta, ok := metaRaw.(*CollectionMetadata); ok { + if meta.Settings != nil { + settings = meta.Settings + } + } + } + + coll := NewCollection(db.name, collName, settings) + + // Восстанавливаем временные метки коллекции если они есть + if timestampsRaw, ok := collData["timestamps"]; ok { + if timestamps, ok := timestampsRaw.(map[string]interface{}); ok { + if createdAt, ok := timestamps["created_at"]; ok { + if v, ok := createdAt.(int64); ok { + coll.metadata.CreatedAt = v + } + } + if updatedAt, ok := timestamps["updated_at"]; ok { + if v, ok := updatedAt.(int64); ok { + coll.metadata.UpdatedAt = v + } + } + } + } + + if docsRaw, ok := collData["documents"]; ok { + if docs, ok := docsRaw.([]interface{}); ok { + for _, docRaw := range docs { + if doc, ok := docRaw.(*Document); ok { + coll.Insert(doc) + } else if docMap, ok := docRaw.(map[string]interface{}); ok { + doc := NewDocument() + if id, ok := docMap["ID"].(string); ok { + doc.ID = id + } else if id, ok := docMap["id"].(string); ok { + doc.ID = id + } + if fields, ok := docMap["fields"]; ok { + if fieldsMap, ok := fields.(map[string]interface{}); ok { + for k, v := range fieldsMap { + doc.SetField(k, v) + } + } + } + if createdAt, ok := docMap["created_at"]; ok { + if v, ok := createdAt.(int64); ok { + doc.CreatedAt = v + } else if v, ok := createdAt.(float64); ok { + doc.CreatedAt = int64(v) + } + } + if updatedAt, ok := docMap["updated_at"]; ok { + if v, ok := updatedAt.(int64); ok { + doc.UpdatedAt = v + } else if v, ok := updatedAt.(float64); ok { + doc.UpdatedAt = int64(v) + } + } + if deletedAt, ok := docMap["deleted_at"]; ok { + if v, ok := deletedAt.(int64); ok { + doc.DeletedAt = v + } else if v, ok := deletedAt.(float64); ok { + doc.DeletedAt = int64(v) + } + } + if version, ok := docMap["version"]; ok { + if v, ok := version.(uint64); ok { + doc.Version = v + } else if v, ok := version.(int64); ok { + doc.Version = uint64(v) + } else if v, ok := version.(float64); ok { + doc.Version = uint64(v) + } + } + coll.Insert(doc) + } + } + } else if docs, ok := docsRaw.([]*Document); ok { + for _, doc := range docs { + coll.Insert(doc) + } + } + } + + db.collections.Store(collName, coll) + AuditCollectionOperation("RESTORE", db.name, collName, settings) + } + + db.UpdateTimestamp() + + return nil +} + +// GetDatabaseNames возвращает имена всех баз данных +func (s *Storage) GetDatabaseNames() []string { + return s.ListDatabases() +} + +// ExistsDatabase проверяет существование базы данных +func (s *Storage) ExistsDatabase(name string) bool { + _, ok := s.databases.Load(name) + return ok +} + +// GetDatabaseCount возвращает количество баз данных +func (s *Storage) GetDatabaseCount() int { + count := 0 + s.databases.Range(func(key, value interface{}) bool { + count++ + return true + }) + return count +} + +// ========== Дополнительные методы для управления хранилищем ========== + +// Backup создаёт резервную копию всех данных +func (s *Storage) Backup(backupPath string) error { + if s.logger != nil { + s.logger.Info(fmt.Sprintf("Starting backup to %s", backupPath)) + } + + backup := make(map[string]interface{}) + backup["created_at"] = s.createdAt + backup["backup_time"] = time.Now().UnixMilli() + + databases := make(map[string][]byte) + + s.databases.Range(func(key, value interface{}) bool { + dbName := key.(string) + db := value.(*Database) + + dbData, err := db.SerializeDatabase() + if err != nil { + if s.logger != nil { + s.logger.Error(fmt.Sprintf("Failed to serialize database %s: %v", dbName, err)) + } + return false + } + + databases[dbName] = dbData + return true + }) + + backup["databases"] = databases + + data, err := serializer.Marshal(backup) + if err != nil { + return fmt.Errorf("failed to marshal backup: %v", err) + } + + if err := os.WriteFile(backupPath, data, 0644); err != nil { + return fmt.Errorf("failed to write backup: %v", err) + } + + if s.logger != nil { + s.logger.Info(fmt.Sprintf("Backup completed: %s", backupPath)) + } + return nil +} + +// Restore восстанавливает данные из резервной копии +func (s *Storage) Restore(backupPath string) error { + if s.logger != nil { + s.logger.Info(fmt.Sprintf("Starting restore from %s", backupPath)) + } + + data, err := os.ReadFile(backupPath) + if err != nil { + return fmt.Errorf("failed to read backup file: %v", err) + } + + var backup map[string]interface{} + if err := serializer.Unmarshal(data, &backup); err != nil { + return fmt.Errorf("failed to unmarshal backup: %v", err) + } + + // Восстанавливаем временную метку хранилища + if createdAt, ok := backup["created_at"]; ok { + if v, ok := createdAt.(int64); ok { + s.createdAt = v + } + } + + databasesRaw, ok := backup["databases"] + if !ok { + return fmt.Errorf("invalid backup format: missing databases") + } + + databases, ok := databasesRaw.(map[string]interface{}) + if !ok { + return fmt.Errorf("invalid backup format: databases is not a map") + } + + for dbName, dbDataRaw := range databases { + dbData, ok := dbDataRaw.([]byte) + if !ok { + continue + } + + if !s.ExistsDatabase(dbName) { + if err := s.CreateDatabase(dbName); err != nil { + return fmt.Errorf("failed to create database %s: %v", dbName, err) + } + } + + db, err := s.GetDatabase(dbName) + if err != nil { + return fmt.Errorf("failed to get database %s: %v", dbName, err) + } + + if err := db.DeserializeDatabase(dbData); err != nil { + return fmt.Errorf("failed to restore database %s: %v", dbName, err) + } + } + + if s.logger != nil { + s.logger.Info("Restore completed") + } + return nil +} + +// GetStats возвращает статистику хранилища +func (s *Storage) GetStats() map[string]interface{} { + stats := map[string]interface{}{ + "total_databases": s.GetDatabaseCount(), + "total_documents": s.GetTotalDocuments(), + "page_size_bytes": s.pageSize, + "created_at": s.createdAt, + "created_at_str": time.UnixMilli(s.createdAt).Format("2006-01-02 15:04:05.000"), + } + + databases := make([]map[string]interface{}, 0) + s.databases.Range(func(key, value interface{}) bool { + db := value.(*Database) + dbStats := map[string]interface{}{ + "name": db.name, + "collections": len(db.ListCollections()), + "created_at": db.GetCreatedAt(), + "updated_at": db.GetUpdatedAt(), + "deleted_at": db.GetDeletedAt(), + } + + totalDocs := int64(0) + totalSize := int64(0) + db.collections.Range(func(k, v interface{}) bool { + coll := v.(*Collection) + totalDocs += coll.Count() + totalSize += coll.Size() + return true + }) + + dbStats["documents"] = totalDocs + dbStats["size_bytes"] = totalSize + databases = append(databases, dbStats) + return true + }) + + stats["databases"] = databases + return stats +}