575 lines
20 KiB
Go
575 lines
20 KiB
Go
/*
|
||
* Copyright 2026 Safronov Grigorii
|
||
*
|
||
* Licensed under the CDDL, Version 1.0 (the "License");
|
||
* you may not use this file except in compliance with the License.
|
||
*
|
||
* You may obtain a copy of the License at
|
||
* https://opensource.org/licenses/CDDL-1.0
|
||
*/
|
||
|
||
// Файл: internal/storage/engine.go
|
||
// Назначение: In-memory движок хранения документов с поддержкой коллекций,
|
||
// слайсов (аналог БД), тапплов (аналог таблиц), полей и кортежей.
|
||
// Полностью wait-free с использованием sync.Map и атомарных операций.
|
||
|
||
package storage
|
||
|
||
import (
|
||
"fmt"
|
||
"os"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"futriis/internal/log"
|
||
"futriis/internal/serializer"
|
||
)
|
||
|
||
// Storage представляет основное хранилище баз данных
|
||
type Storage struct {
|
||
databases sync.Map // map[string]*Database
|
||
pageSize int64
|
||
logger *log.Logger
|
||
totalDocs atomic.Int64
|
||
createdAt int64
|
||
}
|
||
|
||
// Database представляет базу данных (аналог слайса в реляционных СУБД)
|
||
type Database struct {
|
||
name string
|
||
collections sync.Map // map[string]*Collection
|
||
createdAt int64
|
||
updatedAt int64
|
||
deletedAt int64
|
||
mu sync.RWMutex
|
||
}
|
||
|
||
// NewStorage создаёт новый экземпляр хранилища
|
||
func NewStorage(pageSizeMB int, logger *log.Logger) *Storage {
|
||
return &Storage{
|
||
pageSize: int64(pageSizeMB) * 1024 * 1024,
|
||
logger: logger,
|
||
createdAt: time.Now().UnixMilli(),
|
||
}
|
||
}
|
||
|
||
// CreateDatabase создаёт новую базу данных
|
||
func (s *Storage) CreateDatabase(name string) error {
|
||
now := time.Now().UnixMilli()
|
||
db := &Database{
|
||
name: name,
|
||
createdAt: now,
|
||
updatedAt: now,
|
||
deletedAt: 0,
|
||
}
|
||
|
||
if _, exists := s.databases.LoadOrStore(name, db); exists {
|
||
return fmt.Errorf("database already exists")
|
||
}
|
||
AuditDatabaseOperation("CREATE", name)
|
||
if s.logger != nil {
|
||
s.logger.Info("Database created: " + name)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// GetDatabase возвращает базу данных по имени
|
||
func (s *Storage) GetDatabase(name string) (*Database, error) {
|
||
if val, ok := s.databases.Load(name); ok {
|
||
return val.(*Database), nil
|
||
}
|
||
return nil, fmt.Errorf("database not found")
|
||
}
|
||
|
||
// DropDatabase удаляет базу данных
|
||
func (s *Storage) DropDatabase(name string) error {
|
||
if val, ok := s.databases.Load(name); ok {
|
||
db := val.(*Database)
|
||
db.mu.Lock()
|
||
db.deletedAt = time.Now().UnixMilli()
|
||
db.mu.Unlock()
|
||
}
|
||
|
||
if _, ok := s.databases.LoadAndDelete(name); !ok {
|
||
return fmt.Errorf("database not found")
|
||
}
|
||
AuditDatabaseOperation("DROP", name)
|
||
if s.logger != nil {
|
||
s.logger.Info("Database dropped: " + name)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// ListDatabases возвращает список всех баз данных
|
||
func (s *Storage) ListDatabases() []string {
|
||
databases := make([]string, 0)
|
||
s.databases.Range(func(key, value interface{}) bool {
|
||
databases = append(databases, key.(string))
|
||
return true
|
||
})
|
||
return databases
|
||
}
|
||
|
||
// Name возвращает имя базы данных
|
||
func (db *Database) Name() string {
|
||
return db.name
|
||
}
|
||
|
||
// GetCreatedAt возвращает время создания базы данных
|
||
func (db *Database) GetCreatedAt() int64 {
|
||
db.mu.RLock()
|
||
defer db.mu.RUnlock()
|
||
return db.createdAt
|
||
}
|
||
|
||
// GetUpdatedAt возвращает время последнего обновления базы данных
|
||
func (db *Database) GetUpdatedAt() int64 {
|
||
db.mu.RLock()
|
||
defer db.mu.RUnlock()
|
||
return db.updatedAt
|
||
}
|
||
|
||
// GetDeletedAt возвращает время удаления базы данных (0 = не удалена)
|
||
func (db *Database) GetDeletedAt() int64 {
|
||
db.mu.RLock()
|
||
defer db.mu.RUnlock()
|
||
return db.deletedAt
|
||
}
|
||
|
||
// UpdateTimestamp обновляет временную метку базы данных
|
||
func (db *Database) UpdateTimestamp() {
|
||
db.mu.Lock()
|
||
defer db.mu.Unlock()
|
||
db.updatedAt = time.Now().UnixMilli()
|
||
}
|
||
|
||
// CreateCollection создаёт новую коллекцию в базе данных
|
||
func (db *Database) CreateCollection(name string) error {
|
||
if _, exists := db.collections.LoadOrStore(name, NewCollection(db.name, name, nil)); exists {
|
||
return fmt.Errorf("collection already exists")
|
||
}
|
||
db.UpdateTimestamp()
|
||
AuditCollectionOperation("CREATE", db.name, name, nil)
|
||
return nil
|
||
}
|
||
|
||
// CreateCollectionWithSettings создаёт коллекцию с настройками
|
||
func (db *Database) CreateCollectionWithSettings(name string, settings *CollectionSettings) error {
|
||
if _, exists := db.collections.LoadOrStore(name, NewCollection(db.name, name, settings)); exists {
|
||
return fmt.Errorf("collection already exists")
|
||
}
|
||
db.UpdateTimestamp()
|
||
AuditCollectionOperation("CREATE", db.name, name, settings)
|
||
return nil
|
||
}
|
||
|
||
// GetCollection возвращает коллекцию по имени
|
||
func (db *Database) GetCollection(name string) (*Collection, error) {
|
||
if val, ok := db.collections.Load(name); ok {
|
||
return val.(*Collection), nil
|
||
}
|
||
return nil, fmt.Errorf("collection not found")
|
||
}
|
||
|
||
// DropCollection удаляет коллекцию
|
||
func (db *Database) DropCollection(name string) error {
|
||
if _, ok := db.collections.LoadAndDelete(name); !ok {
|
||
return fmt.Errorf("collection not found")
|
||
}
|
||
db.UpdateTimestamp()
|
||
AuditCollectionOperation("DROP", db.name, name, nil)
|
||
return nil
|
||
}
|
||
|
||
// ListCollections возвращает список всех коллекций в базе данных
|
||
func (db *Database) ListCollections() []string {
|
||
collections := make([]string, 0)
|
||
db.collections.Range(func(key, value interface{}) bool {
|
||
collections = append(collections, key.(string))
|
||
return true
|
||
})
|
||
return collections
|
||
}
|
||
|
||
// GetTotalDocuments возвращает общее количество документов во всех коллекциях
|
||
func (s *Storage) GetTotalDocuments() int64 {
|
||
return s.totalDocs.Load()
|
||
}
|
||
|
||
// GetPageSize возвращает размер страницы памяти
|
||
func (s *Storage) GetPageSize() int64 {
|
||
return s.pageSize
|
||
}
|
||
|
||
// GetCreatedAt возвращает время создания хранилища
|
||
func (s *Storage) GetCreatedAt() int64 {
|
||
return s.createdAt
|
||
}
|
||
|
||
// SerializeDatabase сериализует всю базу данных в MessagePack
|
||
func (db *Database) SerializeDatabase() ([]byte, error) {
|
||
dbData := make(map[string]interface{})
|
||
|
||
db.collections.Range(func(key, value interface{}) bool {
|
||
coll := value.(*Collection)
|
||
collData := make(map[string]interface{})
|
||
|
||
docs := coll.GetAllDocuments()
|
||
collDocs := make([]*Document, 0, len(docs))
|
||
for _, doc := range docs {
|
||
collDocs = append(collDocs, doc)
|
||
}
|
||
collData["documents"] = collDocs
|
||
collData["metadata"] = coll.GetMetadata()
|
||
collData["timestamps"] = coll.GetTimestamps()
|
||
|
||
dbData[key.(string)] = collData
|
||
return true
|
||
})
|
||
|
||
// Добавляем метаданные базы данных
|
||
dbData["_metadata"] = map[string]interface{}{
|
||
"name": db.name,
|
||
"created_at": db.GetCreatedAt(),
|
||
"updated_at": db.GetUpdatedAt(),
|
||
"exported_at": time.Now().UnixMilli(),
|
||
}
|
||
|
||
return serializer.Marshal(dbData)
|
||
}
|
||
|
||
// DeserializeDatabase десериализует базу данных из MessagePack
|
||
func (db *Database) DeserializeDatabase(data []byte) error {
|
||
var dbData map[string]interface{}
|
||
if err := serializer.Unmarshal(data, &dbData); err != nil {
|
||
return err
|
||
}
|
||
|
||
// Извлекаем метаданные
|
||
if metaRaw, ok := dbData["_metadata"]; ok {
|
||
if meta, ok := metaRaw.(map[string]interface{}); ok {
|
||
if createdAt, ok := meta["created_at"]; ok {
|
||
if v, ok := createdAt.(int64); ok {
|
||
db.createdAt = v
|
||
}
|
||
}
|
||
}
|
||
delete(dbData, "_metadata")
|
||
}
|
||
|
||
for collName, collDataRaw := range dbData {
|
||
collData, ok := collDataRaw.(map[string]interface{})
|
||
if !ok {
|
||
continue
|
||
}
|
||
|
||
settings := &CollectionSettings{
|
||
MaxDocuments: 0,
|
||
ValidateSchema: false,
|
||
AutoIndexID: true,
|
||
TTLSeconds: 0,
|
||
SoftDelete: false,
|
||
}
|
||
|
||
if metaRaw, ok := collData["metadata"]; ok {
|
||
if meta, ok := metaRaw.(map[string]interface{}); ok {
|
||
if settingsRaw, ok := meta["settings"]; ok {
|
||
if settingsMap, ok := settingsRaw.(map[string]interface{}); ok {
|
||
if maxDocs, ok := settingsMap["max_documents"]; ok {
|
||
if v, ok := maxDocs.(int); ok {
|
||
settings.MaxDocuments = v
|
||
} else if v, ok := maxDocs.(int64); ok {
|
||
settings.MaxDocuments = int(v)
|
||
} else if v, ok := maxDocs.(float64); ok {
|
||
settings.MaxDocuments = int(v)
|
||
}
|
||
}
|
||
if validateSchema, ok := settingsMap["validate_schema"]; ok {
|
||
if v, ok := validateSchema.(bool); ok {
|
||
settings.ValidateSchema = v
|
||
}
|
||
}
|
||
if autoIndexID, ok := settingsMap["auto_index_id"]; ok {
|
||
if v, ok := autoIndexID.(bool); ok {
|
||
settings.AutoIndexID = v
|
||
}
|
||
}
|
||
if ttlSeconds, ok := settingsMap["ttl_seconds"]; ok {
|
||
if v, ok := ttlSeconds.(int); ok {
|
||
settings.TTLSeconds = v
|
||
} else if v, ok := ttlSeconds.(int64); ok {
|
||
settings.TTLSeconds = int(v)
|
||
} else if v, ok := ttlSeconds.(float64); ok {
|
||
settings.TTLSeconds = int(v)
|
||
}
|
||
}
|
||
if softDelete, ok := settingsMap["soft_delete"]; ok {
|
||
if v, ok := softDelete.(bool); ok {
|
||
settings.SoftDelete = v
|
||
}
|
||
}
|
||
}
|
||
}
|
||
} else if meta, ok := metaRaw.(*CollectionMetadata); ok {
|
||
if meta.Settings != nil {
|
||
settings = meta.Settings
|
||
}
|
||
}
|
||
}
|
||
|
||
coll := NewCollection(db.name, collName, settings)
|
||
|
||
// Восстанавливаем временные метки коллекции если они есть
|
||
if timestampsRaw, ok := collData["timestamps"]; ok {
|
||
if timestamps, ok := timestampsRaw.(map[string]interface{}); ok {
|
||
if createdAt, ok := timestamps["created_at"]; ok {
|
||
if v, ok := createdAt.(int64); ok {
|
||
coll.metadata.CreatedAt = v
|
||
}
|
||
}
|
||
if updatedAt, ok := timestamps["updated_at"]; ok {
|
||
if v, ok := updatedAt.(int64); ok {
|
||
coll.metadata.UpdatedAt = v
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
if docsRaw, ok := collData["documents"]; ok {
|
||
if docs, ok := docsRaw.([]interface{}); ok {
|
||
for _, docRaw := range docs {
|
||
if doc, ok := docRaw.(*Document); ok {
|
||
coll.Insert(doc)
|
||
} else if docMap, ok := docRaw.(map[string]interface{}); ok {
|
||
doc := NewDocument()
|
||
if id, ok := docMap["ID"].(string); ok {
|
||
doc.ID = id
|
||
} else if id, ok := docMap["id"].(string); ok {
|
||
doc.ID = id
|
||
}
|
||
if fields, ok := docMap["fields"]; ok {
|
||
if fieldsMap, ok := fields.(map[string]interface{}); ok {
|
||
for k, v := range fieldsMap {
|
||
doc.SetField(k, v)
|
||
}
|
||
}
|
||
}
|
||
if createdAt, ok := docMap["created_at"]; ok {
|
||
if v, ok := createdAt.(int64); ok {
|
||
doc.CreatedAt = v
|
||
} else if v, ok := createdAt.(float64); ok {
|
||
doc.CreatedAt = int64(v)
|
||
}
|
||
}
|
||
if updatedAt, ok := docMap["updated_at"]; ok {
|
||
if v, ok := updatedAt.(int64); ok {
|
||
doc.UpdatedAt = v
|
||
} else if v, ok := updatedAt.(float64); ok {
|
||
doc.UpdatedAt = int64(v)
|
||
}
|
||
}
|
||
if deletedAt, ok := docMap["deleted_at"]; ok {
|
||
if v, ok := deletedAt.(int64); ok {
|
||
doc.DeletedAt = v
|
||
} else if v, ok := deletedAt.(float64); ok {
|
||
doc.DeletedAt = int64(v)
|
||
}
|
||
}
|
||
if version, ok := docMap["version"]; ok {
|
||
if v, ok := version.(uint64); ok {
|
||
doc.Version = v
|
||
} else if v, ok := version.(int64); ok {
|
||
doc.Version = uint64(v)
|
||
} else if v, ok := version.(float64); ok {
|
||
doc.Version = uint64(v)
|
||
}
|
||
}
|
||
coll.Insert(doc)
|
||
}
|
||
}
|
||
} else if docs, ok := docsRaw.([]*Document); ok {
|
||
for _, doc := range docs {
|
||
coll.Insert(doc)
|
||
}
|
||
}
|
||
}
|
||
|
||
db.collections.Store(collName, coll)
|
||
AuditCollectionOperation("RESTORE", db.name, collName, settings)
|
||
}
|
||
|
||
db.UpdateTimestamp()
|
||
|
||
return nil
|
||
}
|
||
|
||
// GetDatabaseNames возвращает имена всех баз данных
|
||
func (s *Storage) GetDatabaseNames() []string {
|
||
return s.ListDatabases()
|
||
}
|
||
|
||
// ExistsDatabase проверяет существование базы данных
|
||
func (s *Storage) ExistsDatabase(name string) bool {
|
||
_, ok := s.databases.Load(name)
|
||
return ok
|
||
}
|
||
|
||
// GetDatabaseCount возвращает количество баз данных
|
||
func (s *Storage) GetDatabaseCount() int {
|
||
count := 0
|
||
s.databases.Range(func(key, value interface{}) bool {
|
||
count++
|
||
return true
|
||
})
|
||
return count
|
||
}
|
||
|
||
// ========== Дополнительные методы для управления хранилищем ==========
|
||
|
||
// Backup создаёт резервную копию всех данных
|
||
func (s *Storage) Backup(backupPath string) error {
|
||
if s.logger != nil {
|
||
s.logger.Info(fmt.Sprintf("Starting backup to %s", backupPath))
|
||
}
|
||
|
||
backup := make(map[string]interface{})
|
||
backup["created_at"] = s.createdAt
|
||
backup["backup_time"] = time.Now().UnixMilli()
|
||
|
||
databases := make(map[string][]byte)
|
||
|
||
s.databases.Range(func(key, value interface{}) bool {
|
||
dbName := key.(string)
|
||
db := value.(*Database)
|
||
|
||
dbData, err := db.SerializeDatabase()
|
||
if err != nil {
|
||
if s.logger != nil {
|
||
s.logger.Error(fmt.Sprintf("Failed to serialize database %s: %v", dbName, err))
|
||
}
|
||
return false
|
||
}
|
||
|
||
databases[dbName] = dbData
|
||
return true
|
||
})
|
||
|
||
backup["databases"] = databases
|
||
|
||
data, err := serializer.Marshal(backup)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to marshal backup: %v", err)
|
||
}
|
||
|
||
if err := os.WriteFile(backupPath, data, 0644); err != nil {
|
||
return fmt.Errorf("failed to write backup: %v", err)
|
||
}
|
||
|
||
if s.logger != nil {
|
||
s.logger.Info(fmt.Sprintf("Backup completed: %s", backupPath))
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// Restore восстанавливает данные из резервной копии
|
||
func (s *Storage) Restore(backupPath string) error {
|
||
if s.logger != nil {
|
||
s.logger.Info(fmt.Sprintf("Starting restore from %s", backupPath))
|
||
}
|
||
|
||
data, err := os.ReadFile(backupPath)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to read backup file: %v", err)
|
||
}
|
||
|
||
var backup map[string]interface{}
|
||
if err := serializer.Unmarshal(data, &backup); err != nil {
|
||
return fmt.Errorf("failed to unmarshal backup: %v", err)
|
||
}
|
||
|
||
// Восстанавливаем временную метку хранилища
|
||
if createdAt, ok := backup["created_at"]; ok {
|
||
if v, ok := createdAt.(int64); ok {
|
||
s.createdAt = v
|
||
}
|
||
}
|
||
|
||
databasesRaw, ok := backup["databases"]
|
||
if !ok {
|
||
return fmt.Errorf("invalid backup format: missing databases")
|
||
}
|
||
|
||
databases, ok := databasesRaw.(map[string]interface{})
|
||
if !ok {
|
||
return fmt.Errorf("invalid backup format: databases is not a map")
|
||
}
|
||
|
||
for dbName, dbDataRaw := range databases {
|
||
dbData, ok := dbDataRaw.([]byte)
|
||
if !ok {
|
||
continue
|
||
}
|
||
|
||
if !s.ExistsDatabase(dbName) {
|
||
if err := s.CreateDatabase(dbName); err != nil {
|
||
return fmt.Errorf("failed to create database %s: %v", dbName, err)
|
||
}
|
||
}
|
||
|
||
db, err := s.GetDatabase(dbName)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to get database %s: %v", dbName, err)
|
||
}
|
||
|
||
if err := db.DeserializeDatabase(dbData); err != nil {
|
||
return fmt.Errorf("failed to restore database %s: %v", dbName, err)
|
||
}
|
||
}
|
||
|
||
if s.logger != nil {
|
||
s.logger.Info("Restore completed")
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// GetStats возвращает статистику хранилища
|
||
func (s *Storage) GetStats() map[string]interface{} {
|
||
stats := map[string]interface{}{
|
||
"total_databases": s.GetDatabaseCount(),
|
||
"total_documents": s.GetTotalDocuments(),
|
||
"page_size_bytes": s.pageSize,
|
||
"created_at": s.createdAt,
|
||
"created_at_str": time.UnixMilli(s.createdAt).Format("2006-01-02 15:04:05.000"),
|
||
}
|
||
|
||
databases := make([]map[string]interface{}, 0)
|
||
s.databases.Range(func(key, value interface{}) bool {
|
||
db := value.(*Database)
|
||
dbStats := map[string]interface{}{
|
||
"name": db.name,
|
||
"collections": len(db.ListCollections()),
|
||
"created_at": db.GetCreatedAt(),
|
||
"updated_at": db.GetUpdatedAt(),
|
||
"deleted_at": db.GetDeletedAt(),
|
||
}
|
||
|
||
totalDocs := int64(0)
|
||
totalSize := int64(0)
|
||
db.collections.Range(func(k, v interface{}) bool {
|
||
coll := v.(*Collection)
|
||
totalDocs += coll.Count()
|
||
totalSize += coll.Size()
|
||
return true
|
||
})
|
||
|
||
dbStats["documents"] = totalDocs
|
||
dbStats["size_bytes"] = totalSize
|
||
databases = append(databases, dbStats)
|
||
return true
|
||
})
|
||
|
||
stats["databases"] = databases
|
||
return stats
|
||
}
|